lazy Future construction for AsyncDerived

This commit is contained in:
Greg Johnston 2024-05-20 08:03:14 -04:00
parent d360cc280f
commit 961bf89a8b
2 changed files with 71 additions and 10 deletions

View file

@ -12,7 +12,8 @@ use futures::Future;
use hydration_context::SerializedDataId; use hydration_context::SerializedDataId;
use reactive_graph::{ use reactive_graph::{
computed::{ computed::{
ArcAsyncDerived, ArcMemo, AsyncDerived, AsyncDerivedFuture, AsyncState, ArcAsyncDerived, ArcAsyncDerivedFuture, ArcMemo, AsyncDerived,
AsyncDerivedFuture, AsyncState,
}, },
graph::{Source, ToAnySource, ToAnySubscriber}, graph::{Source, ToAnySource, ToAnySubscriber},
owner::Owner, owner::Owner,
@ -157,6 +158,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_with_encoding<S, Fut>( pub fn new_with_encoding<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -241,7 +243,7 @@ where
T: Clone + 'static, T: Clone + 'static,
{ {
type Output = T; type Output = T;
type IntoFuture = AsyncDerivedFuture<T>; type IntoFuture = ArcAsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture { fn into_future(self) -> Self::IntoFuture {
self.data.into_future() self.data.into_future()
@ -281,6 +283,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new<S, Fut>( pub fn new<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -300,6 +303,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_serde<S, Fut>( pub fn new_serde<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -320,6 +324,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_serde_wb<S, Fut>( pub fn new_serde_wb<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -340,6 +345,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_miniserde<S, Fut>( pub fn new_miniserde<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -360,6 +366,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_serde_lite<S, Fut>( pub fn new_serde_lite<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -380,6 +387,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_rkyv<S, Fut>( pub fn new_rkyv<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -400,6 +408,7 @@ where
T::SerErr: Debug, T::SerErr: Debug,
T::DeErr: Debug, T::DeErr: Debug,
{ {
#[track_caller]
pub fn new_with_encoding<S, Fut>( pub fn new_with_encoding<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static, source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static, fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
@ -425,6 +434,7 @@ where
type Output = T; type Output = T;
type IntoFuture = AsyncDerivedFuture<T>; type IntoFuture = AsyncDerivedFuture<T>;
#[track_caller]
fn into_future(self) -> Self::IntoFuture { fn into_future(self) -> Self::IntoFuture {
self.data.into_future() self.data.into_future()
} }

View file

@ -1,10 +1,12 @@
use super::{ArcAsyncDerived, AsyncState}; use super::{ArcAsyncDerived, AsyncDerived, AsyncState};
use crate::{ use crate::{
graph::{AnySource, ToAnySource}, graph::{AnySource, ToAnySource},
signal::guards::Plain, signal::guards::Plain,
traits::Track, traits::{DefinedAt, Track},
unwrap_signal,
}; };
use or_poisoned::OrPoisoned; use or_poisoned::OrPoisoned;
use pin_project_lite::pin_project;
use std::{ use std::{
future::{Future, IntoFuture}, future::{Future, IntoFuture},
pin::Pin, pin::Pin,
@ -14,13 +16,13 @@ use std::{
/// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading, /// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading,
/// but does not contain its value. /// but does not contain its value.
pub struct AsyncDerivedReadyFuture<T> { pub struct ArcAsyncDerivedReadyFuture<T> {
pub(crate) source: AnySource, pub(crate) source: AnySource,
pub(crate) value: Arc<RwLock<AsyncState<T>>>, pub(crate) value: Arc<RwLock<AsyncState<T>>>,
pub(crate) wakers: Arc<RwLock<Vec<Waker>>>, pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
} }
impl<T: 'static> Future for AsyncDerivedReadyFuture<T> { impl<T: 'static> Future for ArcAsyncDerivedReadyFuture<T> {
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -38,7 +40,7 @@ impl<T: 'static> Future for AsyncDerivedReadyFuture<T> {
/// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading, /// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading,
/// and contains its value. /// and contains its value.
pub struct AsyncDerivedFuture<T> { pub struct ArcAsyncDerivedFuture<T> {
source: AnySource, source: AnySource,
value: Arc<RwLock<AsyncState<T>>>, value: Arc<RwLock<AsyncState<T>>>,
wakers: Arc<RwLock<Vec<Waker>>>, wakers: Arc<RwLock<Vec<Waker>>>,
@ -49,10 +51,10 @@ where
T: Clone + 'static, T: Clone + 'static,
{ {
type Output = T; type Output = T;
type IntoFuture = AsyncDerivedFuture<T>; type IntoFuture = ArcAsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture { fn into_future(self) -> Self::IntoFuture {
AsyncDerivedFuture { ArcAsyncDerivedFuture {
source: self.to_any_source(), source: self.to_any_source(),
value: Arc::clone(&self.value), value: Arc::clone(&self.value),
wakers: Arc::clone(&self.wakers), wakers: Arc::clone(&self.wakers),
@ -62,7 +64,7 @@ where
// this is implemented to output T by cloning it because read guards should not be held across // this is implemented to output T by cloning it because read guards should not be held across
// .await points, and it's way too easy to trip up by doing that! // .await points, and it's way too easy to trip up by doing that!
impl<T> Future for AsyncDerivedFuture<T> impl<T> Future for ArcAsyncDerivedFuture<T>
where where
T: Clone + 'static, T: Clone + 'static,
{ {
@ -82,3 +84,52 @@ where
} }
} }
} }
pin_project! {
/// A [`Future`] that is ready when an [`AsyncDerived`] is finished loading or reloading,
/// and contains its value.
pub struct AsyncDerivedFuture<T> {
this: AsyncDerived<T>,
#[pin]
inner: Option<ArcAsyncDerivedFuture<T>>,
}
}
impl<T> IntoFuture for AsyncDerived<T>
where
T: Send + Sync + Clone + 'static,
{
type Output = T;
type IntoFuture = AsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
AsyncDerivedFuture {
this: self,
inner: None,
}
}
}
// this is implemented to output T by cloning it because read guards should not be held across
// .await points, and it's way too easy to trip up by doing that!
impl<T> Future for AsyncDerivedFuture<T>
where
T: Send + Sync + Clone + 'static,
{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if this.inner.is_none() {
let stored = *this.this;
this.inner.set(Some(
stored
.inner
.get()
.unwrap_or_else(unwrap_signal!(stored))
.into_future(),
));
}
this.inner.as_pin_mut().unwrap().poll(cx)
}
}