feat: local resources with .await

This commit is contained in:
Greg Johnston 2024-07-04 15:39:42 -04:00
parent d5aecbe705
commit dc21e4ff53
12 changed files with 723 additions and 130 deletions

View file

@ -51,51 +51,63 @@ pub fn App() -> impl IntoView {
// out-of-order
<ParentRoute
path=StaticSegment("out-of-order")
view=|| view! {
<SecondaryNav/>
<h1>"Out-of-Order"</h1>
<Outlet/>
view=|| {
view! {
<SecondaryNav/>
<h1>"Out-of-Order"</h1>
<Outlet/>
}
}
>
<Route path=StaticSegment("") view=Nested/>
<Route path=StaticSegment("inside") view=NestedResourceInside/>
<Route path=StaticSegment("single") view=Single/>
<Route path=StaticSegment("parallel") view=Parallel/>
<Route path=StaticSegment("inside-component") view=InsideComponent/>
<Route path=StaticSegment("local") view=LocalResource/>
<Route path=StaticSegment("none") view=None/>
</ParentRoute>
// in-order
<ParentRoute
path=StaticSegment("in-order")
ssr=SsrMode::InOrder
view=|| view! {
<SecondaryNav/>
<h1>"In-Order"</h1>
<Outlet/>
view=|| {
view! {
<SecondaryNav/>
<h1>"In-Order"</h1>
<Outlet/>
}
}
>
<Route path=StaticSegment("") view=Nested/>
<Route path=StaticSegment("inside") view=NestedResourceInside/>
<Route path=StaticSegment("single") view=Single/>
<Route path=StaticSegment("parallel") view=Parallel/>
<Route path=StaticSegment("inside-component") view=InsideComponent/>
<Route path=StaticSegment("local") view=LocalResource/>
<Route path=StaticSegment("none") view=None/>
</ParentRoute>
// async
<ParentRoute
path=StaticSegment("async")
ssr=SsrMode::Async
view=|| view! {
<SecondaryNav/>
<h1>"Async"</h1>
<Outlet/>
view=|| {
view! {
<SecondaryNav/>
<h1>"Async"</h1>
<Outlet/>
}
}
>
<Route path=StaticSegment("") view=Nested/>
<Route path=StaticSegment("inside") view=NestedResourceInside/>
<Route path=StaticSegment("single") view=Single/>
<Route path=StaticSegment("parallel") view=Parallel/>
<Route path=StaticSegment("inside-component") view=InsideComponent/>
<Route path=StaticSegment("local") view=LocalResource/>
<Route path=StaticSegment("none") view=None/>
</ParentRoute>
</Routes>
@ -108,11 +120,16 @@ pub fn App() -> impl IntoView {
fn SecondaryNav() -> impl IntoView {
view! {
<nav>
<A href="" exact=true>"Nested"</A>
<A href="inside" exact=true>"Nested (resource created inside)"</A>
<A href="" exact=true>
"Nested"
</A>
<A href="inside" exact=true>
"Nested (resource created inside)"
</A>
<A href="single">"Single"</A>
<A href="parallel">"Parallel"</A>
<A href="inside-component">"Inside Component"</A>
<A href="local">"Local Resource"</A>
<A href="none">"No Resources"</A>
</nav>
}
@ -126,21 +143,28 @@ fn Nested() -> impl IntoView {
view! {
<div>
<Suspense fallback=|| "Loading 1...">
<Suspense fallback=|| {
"Loading 1..."
}>
{move || {
one_second.get().map(|_| view! {
<p id="loaded-1">"One Second: Loaded 1!"</p>
})
one_second.get().map(|_| view! { <p id="loaded-1">"One Second: Loaded 1!"</p> })
}}
<Suspense fallback=|| "Loading 2...">
<Suspense fallback=|| {
"Loading 2..."
}>
{move || {
two_second.get().map(|_| view! {
<p id="loaded-2">"Two Second: Loaded 2!"</p>
<button on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
})
two_second
.get()
.map(|_| {
view! {
<p id="loaded-2">"Two Second: Loaded 2!"</p>
<button on:click=move |_| {
set_count.update(|n| *n += 1)
}>{count}</button>
}
})
}}
</Suspense>
</Suspense>
</div>
@ -154,25 +178,27 @@ fn NestedResourceInside() -> impl IntoView {
view! {
<div>
<Suspense fallback=|| "Loading 1...">
<Suspense fallback=|| {
"Loading 1..."
}>
{Suspend(async move {
_ = one_second.await;
let two_second = Resource::new(|| (), move |_| async move {
second_wait_fn(WAIT_TWO_SECONDS).await
});
let two_second = Resource::new(
|| (),
move |_| async move { second_wait_fn(WAIT_TWO_SECONDS).await },
);
view! {
<p id="loaded-1">"One Second: Loaded 1!"</p>
<Suspense fallback=|| "Loading 2...">
<span id="loaded-2">
"Loaded 2 (created inside first suspense)!: "
{Suspend(async move { format!("{:?}", two_second.await)})}
{Suspend(async move { format!("{:?}", two_second.await) })}
</span>
<button on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
<button on:click=move |_| set_count.update(|n| *n += 1)>{count}</button>
</Suspense>
}
})}
</Suspense>
</div>
}
@ -186,25 +212,42 @@ fn Parallel() -> impl IntoView {
view! {
<div>
<Suspense fallback=|| "Loading 1...">
<Suspense fallback=|| {
"Loading 1..."
}>
{move || {
one_second.get().map(move |_| view! {
<p id="loaded-1">"One Second: Loaded 1!"</p>
<button on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
})
one_second
.get()
.map(move |_| {
view! {
<p id="loaded-1">"One Second: Loaded 1!"</p>
<button on:click=move |_| {
set_count.update(|n| *n += 1)
}>{count}</button>
}
})
}}
</Suspense>
<Suspense fallback=|| "Loading 2...">
<Suspense fallback=|| {
"Loading 2..."
}>
{move || {
two_second.get().map(move |_| view! {
<p id="loaded-2">"Two Second: Loaded 2!"</p>
<button id="second-count" on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
})
two_second
.get()
.map(move |_| {
view! {
<p id="loaded-2">"Two Second: Loaded 2!"</p>
<button
id="second-count"
on:click=move |_| set_count.update(|n| *n += 1)
>
{count}
</button>
}
})
}}
</Suspense>
</div>
}
@ -217,18 +260,17 @@ fn Single() -> impl IntoView {
view! {
<div>
<Suspense fallback=|| "Loading 1...">
{move || {
one_second.get().map(|_| view! {
<p id="loaded-1">"One Second: Loaded 1!"</p>
})
}}
<Suspense fallback=|| {
"Loading 1..."
}>
{move || {
one_second.get().map(|_| view! { <p id="loaded-1">"One Second: Loaded 1!"</p> })
}}
</Suspense>
<p id="following-message">"Children following Suspense should hydrate properly."</p>
<div>
<button on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
<button on:click=move |_| set_count.update(|n| *n += 1)>{count}</button>
</div>
</div>
}
@ -244,9 +286,7 @@ fn InsideComponent() -> impl IntoView {
<InsideComponentChild/>
<p id="following-message">"Children following Suspense should hydrate properly."</p>
<div>
<button on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
<button on:click=move |_| set_count.update(|n| *n += 1)>{count}</button>
</div>
</div>
}
@ -256,16 +296,47 @@ fn InsideComponent() -> impl IntoView {
fn InsideComponentChild() -> impl IntoView {
let one_second = Resource::new(|| WAIT_ONE_SECOND, first_wait_fn);
view! {
<Suspense fallback=|| "Loading 1...">
{move || {
one_second.get().map(|_| view! {
<p id="loaded-1">"One Second: Loaded 1!"</p>
})
}}
<Suspense fallback=|| {
"Loading 1..."
}>
{move || {
one_second.get().map(|_| view! { <p id="loaded-1">"One Second: Loaded 1!"</p> })
}}
</Suspense>
}
}
#[component]
fn LocalResource() -> impl IntoView {
let one_second = Resource::new(|| WAIT_ONE_SECOND, first_wait_fn);
let local = LocalResource::new(|| first_wait_fn(WAIT_ONE_SECOND));
let (count, set_count) = signal(0);
view! {
<div>
<Suspense fallback=|| {
"Loading 1..."
}>
{move || {
one_second.get().map(|_| view! { <p id="loaded-1">"One Second: Loaded 1!"</p> })
}}
{move || {
Suspend(async move {
let value = local.await;
view! { <p id="loaded-2">"One Second: Local Loaded " {value} "!"</p> }
})
}}
</Suspense>
<p id="following-message">"Children following Suspense should hydrate properly."</p>
<div>
<button on:click=move |_| set_count.update(|n| *n += 1)>{count}</button>
</div>
</div>
}
}
#[component]
fn None() -> impl IntoView {
let (count, set_count) = signal(0);
@ -274,9 +345,7 @@ fn None() -> impl IntoView {
<div>
<Suspense fallback=|| "Loading 1...">
<p id="inside-message">"Children inside Suspense should hydrate properly."</p>
<button on:click=move |_| set_count.update(|n| *n += 1)>
{count}
</button>
<button on:click=move |_| set_count.update(|n| *n += 1)>{count}</button>
</Suspense>
<p id="following-message">"Children following Suspense should hydrate properly."</p>
<div>

View file

@ -84,4 +84,12 @@ impl SharedContext for CsrSharedContext {
fn await_deferred(&self) -> Option<PinnedFuture<()>> {
None
}
#[inline(always)]
fn set_incomplete_chunk(&self, _id: SerializedDataId) {}
#[inline(always)]
fn get_incomplete_chunk(&self, _id: &SerializedDataId) -> bool {
false
}
}

View file

@ -15,6 +15,8 @@ extern "C" {
static __RESOLVED_RESOURCES: Array;
static __SERIALIZED_ERRORS: Array;
static __INCOMPLETE_CHUNKS: Array;
}
fn serialized_errors() -> Vec<(SerializedDataId, ErrorId, Error)> {
@ -38,6 +40,16 @@ fn serialized_errors() -> Vec<(SerializedDataId, ErrorId, Error)> {
.collect()
}
fn incomplete_chunks() -> Vec<SerializedDataId> {
__INCOMPLETE_CHUNKS
.iter()
.map(|value| {
let id = value.as_f64().unwrap() as usize;
SerializedDataId(id)
})
.collect()
}
/// An error that has been serialized across the network boundary.
#[derive(Debug, Clone)]
struct SerializedError(String);
@ -57,6 +69,7 @@ pub struct HydrateSharedContext {
is_hydrating: AtomicBool,
during_hydration: AtomicBool,
errors: Lazy<Vec<(SerializedDataId, ErrorId, Error)>>,
incomplete: Lazy<Vec<SerializedDataId>>,
}
impl HydrateSharedContext {
@ -67,6 +80,7 @@ impl HydrateSharedContext {
is_hydrating: AtomicBool::new(true),
during_hydration: AtomicBool::new(true),
errors: Lazy::new(serialized_errors),
incomplete: Lazy::new(incomplete_chunks),
}
}
@ -80,6 +94,7 @@ impl HydrateSharedContext {
is_hydrating: AtomicBool::new(false),
during_hydration: AtomicBool::new(true),
errors: Lazy::new(serialized_errors),
incomplete: Lazy::new(incomplete_chunks),
}
}
}
@ -166,4 +181,11 @@ impl SharedContext for HydrateSharedContext {
fn await_deferred(&self) -> Option<PinnedFuture<()>> {
None
}
#[inline(always)]
fn set_incomplete_chunk(&self, _id: SerializedDataId) {}
fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool {
self.incomplete.iter().any(|entry| entry == id)
}
}

View file

@ -140,4 +140,11 @@ pub trait SharedContext: Debug {
///
/// In browser implementations, this should be a no-op.
fn await_deferred(&self) -> Option<PinnedFuture<()>>;
/// Tells the client that this chunk is being sent from the server before all its data have
/// loaded, and it may be in a fallback state.
fn set_incomplete_chunk(&self, id: SerializedDataId);
/// Checks whether this chunk is being sent from the server before all its data have loaded.
fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool;
}

View file

@ -2,7 +2,7 @@ use super::{SerializedDataId, SharedContext};
use crate::{PinnedFuture, PinnedStream};
use futures::{
future::join_all,
stream::{self},
stream::{self, once},
Stream, StreamExt,
};
use or_poisoned::OrPoisoned;
@ -33,6 +33,7 @@ pub struct SsrSharedContext {
errors: ErrorBuf,
sealed_error_boundaries: SealedErrors,
deferred: Mutex<Vec<PinnedFuture<()>>>,
incomplete: Arc<Mutex<Vec<SerializedDataId>>>,
}
impl SsrSharedContext {
@ -178,8 +179,19 @@ impl SharedContext for SsrSharedContext {
sealed_error_boundaries: Arc::clone(&self.sealed_error_boundaries),
};
let stream =
stream::once(async move { initial_chunk }).chain(async_data);
let incomplete = Arc::clone(&self.incomplete);
let stream = stream::once(async move { initial_chunk })
.chain(async_data)
.chain(once(async move {
let mut script = String::new();
script.push_str("__INCOMPLETE_CHUNKS=[");
for chunk in mem::take(&mut *incomplete.lock().or_poisoned()) {
_ = write!(script, "{},", chunk.0);
}
script.push_str("];");
script
}));
Some(Box::pin(stream))
}
@ -203,6 +215,18 @@ impl SharedContext for SsrSharedContext {
}))
}
}
fn set_incomplete_chunk(&self, id: SerializedDataId) {
self.incomplete.lock().or_poisoned().push(id);
}
fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool {
self.incomplete
.lock()
.or_poisoned()
.iter()
.any(|entry| entry == id)
}
}
struct AsyncDataStream {

View file

@ -2,14 +2,18 @@ use crate::{
children::{TypedChildren, ViewFnOnce},
IntoView,
};
use futures::FutureExt;
use futures::{select, FutureExt, StreamExt};
use hydration_context::SerializedDataId;
use leptos_macro::component;
use reactive_graph::{
computed::{suspense::SuspenseContext, ArcMemo, ScopedFuture},
computed::{
suspense::{LocalResourceNotifier, SuspenseContext},
ArcMemo, ScopedFuture,
},
effect::RenderEffect,
owner::{provide_context, use_context, Owner},
signal::ArcRwSignal,
traits::{Get, Read, Track, With},
traits::{Get, GetUntracked, Read, Track, With},
};
use slotmap::{DefaultKey, SlotMap};
use tachys::{
@ -36,15 +40,31 @@ pub fn Suspense<Chil>(
where
Chil: IntoView + Send + 'static,
{
let (starts_local, id) = {
Owner::current_shared_context()
.map(|sc| {
let id = sc.next_id();
(sc.get_incomplete_chunk(&id), id)
})
.unwrap_or_else(|| (false, Default::default()))
};
let fallback = fallback.run();
let children = children.into_inner()();
let tasks = ArcRwSignal::new(SlotMap::<DefaultKey, ()>::new());
provide_context(SuspenseContext {
tasks: tasks.clone(),
});
let none_pending = ArcMemo::new(move |_| tasks.with(SlotMap::is_empty));
let none_pending = ArcMemo::new(move |prev: Option<&bool>| {
tasks.track();
if prev.is_none() && starts_local {
false
} else {
tasks.with(SlotMap::is_empty)
}
});
OwnedView::new(SuspenseBoundary::<false, _, _> {
id,
none_pending,
fallback,
children,
@ -52,6 +72,7 @@ where
}
pub(crate) struct SuspenseBoundary<const TRANSITION: bool, Fal, Chil> {
pub id: SerializedDataId,
pub none_pending: ArcMemo<bool>,
pub fallback: Fal,
pub children: Chil,
@ -130,11 +151,13 @@ where
{
let attr = attr.into_cloneable_owned();
let SuspenseBoundary {
id,
none_pending,
fallback,
children,
} = self;
SuspenseBoundary {
id,
none_pending,
fallback,
children: children.add_any_attr(attr),
@ -183,15 +206,21 @@ where
let owner = Owner::current().unwrap();
let tasks = suspense_context.tasks.clone();
let (tx, rx) = futures::channel::oneshot::channel::<()>();
// we need to wait for one of two things: either
// 1. all tasks are finished loading, or
// 2. we read from a local resource, meaning this Suspense can never resolve on the server
let mut tx = Some(tx);
// first, create listener for tasks
let tasks = suspense_context.tasks.clone();
let (tasks_tx, mut tasks_rx) =
futures::channel::oneshot::channel::<()>();
let mut tasks_tx = Some(tasks_tx);
let eff = reactive_graph::effect::RenderEffect::new_isomorphic({
move |_| {
tasks.track();
if tasks.read().is_empty() {
if let Some(tx) = tx.take() {
if let Some(tx) = tasks_tx.take() {
// If the receiver has dropped, it means the ScopedFuture has already
// dropped, so it doesn't matter if we manage to send this.
_ = tx.send(());
@ -200,37 +229,75 @@ where
}
});
// now, create listener for local resources
let (local_tx, mut local_rx) =
futures::channel::oneshot::channel::<()>();
provide_context(LocalResourceNotifier::from(local_tx));
// walk over the tree of children once to make sure that all resource loads are registered
self.children.dry_resolve();
let mut fut =
Box::pin(ScopedFuture::new(ErrorHookFuture::new(async move {
// wait for all the resources to have loaded before trying to resolve the body
let mut fut = Box::pin(ScopedFuture::new(ErrorHookFuture::new(
async move {
// race the local resource notifier against the set of tasks
//
// if there are local resources, we just return the fallback immediately
//
// otherwise, we want to wait for resources to load before trying to resolve the body
//
// this is *less efficient* than just resolving the body
// however, it means that you can use reactive accesses to resources/async derived
// inside component props, at any level, and have those picked up by Suspense, and
// that it will wait for those to resolve
_ = rx.await;
select! {
// if there are local resources, bail
// this will only have fired by this point for local resources accessed
// *synchronously*
_ = local_rx => {
let sc = Owner::current_shared_context().expect("no shared context");
sc.set_incomplete_chunk(self.id);
None
}
_ = tasks_rx => {
// if we ran this earlier, reactive reads would always be registered as None
// this is fine in the case where we want to use Suspend and .await on some future
// but in situations like a <For each=|| some_resource.snapshot()/> we actually
// want to be able to 1) synchronously read a resource's value, but still 2) wait
// for it to load before we render everything
let mut children = Box::pin(self.children.resolve().fuse());
// if we ran this earlier, reactive reads would always be registered as None
// this is fine in the case where we want to use Suspend and .await on some future
// but in situations like a <For each=|| some_resource.snapshot()/> we actually
// want to be able to 1) synchronously read a resource's value, but still 2) wait
// for it to load before we render everything
let children = self.children.resolve().await;
// we continue racing the children against the "do we have any local
// resources?" Future
select! {
_ = local_rx => {
let sc = Owner::current_shared_context().expect("no shared context");
sc.set_incomplete_chunk(self.id);
None
}
children = children => {
// clean up the (now useless) effect
drop(eff);
// clean up the (now useless) effect
drop(eff);
OwnedView::new_with_owner(children, owner)
})));
Some(OwnedView::new_with_owner(children, owner))
}
}
}
}
},
)));
match fut.as_mut().now_or_never() {
Some(resolved) => {
Some(Some(resolved)) => {
Either::<Fal, _>::Right(resolved)
.to_html_async_with_buf::<OUT_OF_ORDER>(
buf, position, escape,
);
}
Some(None) => {
Either::<_, Chil>::Left(self.fallback)
.to_html_async_with_buf::<OUT_OF_ORDER>(
buf, position, escape,
);
}
None => {
let id = buf.clone_id();
@ -244,14 +311,16 @@ where
buf.push_async({
let mut position = *position;
async move {
let value = fut.await;
let value = match fut.await {
None => Either::Left(self.fallback),
Some(value) => Either::Right(value),
};
let mut builder = StreamBuilder::new(id);
Either::<Fal, _>::Right(value)
.to_html_async_with_buf::<OUT_OF_ORDER>(
&mut builder,
&mut position,
escape,
);
value.to_html_async_with_buf::<OUT_OF_ORDER>(
&mut builder,
&mut position,
escape,
);
builder.finish().take_chunks()
}
});
@ -268,6 +337,7 @@ where
) -> Self::State {
let cursor = cursor.to_owned();
let position = position.to_owned();
let mut children = Some(self.children);
let mut fallback = Some(self.fallback);
let none_pending = self.none_pending;

View file

@ -7,9 +7,9 @@ use leptos_macro::component;
use reactive_graph::{
computed::{suspense::SuspenseContext, ArcMemo},
effect::Effect,
owner::provide_context,
owner::{provide_context, Owner},
signal::ArcRwSignal,
traits::{Get, With},
traits::{Get, Track, With},
wrappers::write::SignalSetter,
};
use slotmap::{DefaultKey, SlotMap};
@ -31,13 +31,28 @@ pub fn Transition<Chil>(
where
Chil: IntoView + Send + 'static,
{
let (starts_local, id) = {
Owner::current_shared_context()
.map(|sc| {
let id = sc.next_id();
(sc.get_incomplete_chunk(&id), id)
})
.unwrap_or_else(|| (false, Default::default()))
};
let fallback = fallback.run();
let children = children.into_inner()();
let tasks = ArcRwSignal::new(SlotMap::<DefaultKey, ()>::new());
provide_context(SuspenseContext {
tasks: tasks.clone(),
});
let none_pending = ArcMemo::new(move |_| tasks.with(SlotMap::is_empty));
let none_pending = ArcMemo::new(move |prev: Option<&bool>| {
tasks.track();
if prev.is_none() && starts_local {
false
} else {
tasks.with(SlotMap::is_empty)
}
});
if let Some(set_pending) = set_pending {
Effect::new_isomorphic({
let none_pending = none_pending.clone();
@ -48,6 +63,7 @@ where
}
OwnedView::new(SuspenseBoundary::<true, _, _> {
id,
none_pending,
fallback,
children,

View file

@ -3,6 +3,8 @@
mod action;
pub use action::*;
mod local_resource;
pub use local_resource::*;
mod multi_action;
pub use multi_action::*;
mod resource;

View file

@ -0,0 +1,334 @@
use reactive_graph::{
computed::{
suspense::{LocalResourceNotifier, SuspenseContext},
ArcAsyncDerived, AsyncDerived, AsyncDerivedFuture,
},
graph::{
AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
ToAnySource, ToAnySubscriber,
},
owner::use_context,
signal::guards::{AsyncPlain, ReadGuard},
traits::{DefinedAt, ReadUntracked},
};
use std::{
future::{pending, Future, IntoFuture},
panic::Location,
};
pub struct ArcLocalResource<T> {
data: ArcAsyncDerived<T>,
#[cfg(debug_assertions)]
defined_at: &'static Location<'static>,
}
impl<T> Clone for ArcLocalResource<T> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
#[cfg(debug_assertions)]
defined_at: self.defined_at,
}
}
}
impl<T> ArcLocalResource<T> {
#[track_caller]
pub fn new<Fut>(fetcher: impl Fn() -> Fut + Send + Sync + 'static) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
let fetcher = move || {
let fut = fetcher();
async move {
// in SSR mode, this will simply always be pending
// if we try to read from it, we will trigger Suspense automatically to fall back
// so this will never need to return anything
if cfg!(feature = "ssr") {
pending().await
} else {
fut.await
}
}
};
Self {
data: ArcAsyncDerived::new(fetcher),
#[cfg(debug_assertions)]
defined_at: Location::caller(),
}
}
}
impl<T> IntoFuture for ArcLocalResource<T>
where
T: Clone + 'static,
{
type Output = T;
type IntoFuture = AsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
} else if cfg!(feature = "ssr") {
panic!(
"Reading from a LocalResource outside Suspense in `ssr` mode \
will cause the response to hang, because LocalResources are \
always pending on the server."
);
}
self.data.into_future()
}
}
impl<T> DefinedAt for ArcLocalResource<T> {
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(debug_assertions)]
{
Some(self.defined_at)
}
#[cfg(not(debug_assertions))]
{
None
}
}
}
impl<T> ReadUntracked for ArcLocalResource<T>
where
T: Send + Sync + 'static,
{
type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
} else if cfg!(feature = "ssr") {
panic!(
"Reading from a LocalResource outside Suspense in `ssr` mode \
will cause the response to hang, because LocalResources are \
always pending on the server."
);
}
self.data.try_read_untracked()
}
}
impl<T: 'static> ToAnySource for ArcLocalResource<T> {
fn to_any_source(&self) -> AnySource {
self.data.to_any_source()
}
}
impl<T: 'static> ToAnySubscriber for ArcLocalResource<T> {
fn to_any_subscriber(&self) -> AnySubscriber {
self.data.to_any_subscriber()
}
}
impl<T> Source for ArcLocalResource<T> {
fn add_subscriber(&self, subscriber: AnySubscriber) {
self.data.add_subscriber(subscriber)
}
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
self.data.remove_subscriber(subscriber);
}
fn clear_subscribers(&self) {
self.data.clear_subscribers();
}
}
impl<T> ReactiveNode for ArcLocalResource<T> {
fn mark_dirty(&self) {
self.data.mark_dirty();
}
fn mark_check(&self) {
self.data.mark_check();
}
fn mark_subscribers_check(&self) {
self.data.mark_subscribers_check();
}
fn update_if_necessary(&self) -> bool {
self.data.update_if_necessary()
}
}
impl<T> Subscriber for ArcLocalResource<T> {
fn add_source(&self, source: AnySource) {
self.data.add_source(source);
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
self.data.clear_sources(subscriber);
}
}
pub struct LocalResource<T> {
data: AsyncDerived<T>,
#[cfg(debug_assertions)]
defined_at: &'static Location<'static>,
}
impl<T> Clone for LocalResource<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for LocalResource<T> {}
impl<T> LocalResource<T> {
#[track_caller]
pub fn new<Fut>(fetcher: impl Fn() -> Fut + Send + Sync + 'static) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
let fetcher = move || {
let fut = fetcher();
async move {
// in SSR mode, this will simply always be pending
// if we try to read from it, we will trigger Suspense automatically to fall back
// so this will never need to return anything
if cfg!(feature = "ssr") {
pending().await
} else {
fut.await
}
}
};
Self {
data: AsyncDerived::new(fetcher),
#[cfg(debug_assertions)]
defined_at: Location::caller(),
}
}
}
impl<T> IntoFuture for LocalResource<T>
where
T: Clone + 'static,
{
type Output = T;
type IntoFuture = AsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
} else if cfg!(feature = "ssr") {
panic!(
"Reading from a LocalResource outside Suspense in `ssr` mode \
will cause the response to hang, because LocalResources are \
always pending on the server."
);
}
self.data.into_future()
}
}
impl<T> DefinedAt for LocalResource<T> {
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(debug_assertions)]
{
Some(self.defined_at)
}
#[cfg(not(debug_assertions))]
{
None
}
}
}
impl<T> ReadUntracked for LocalResource<T>
where
T: Send + Sync + 'static,
{
type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
} else if cfg!(feature = "ssr") {
panic!(
"Reading from a LocalResource outside Suspense in `ssr` mode \
will cause the response to hang, because LocalResources are \
always pending on the server."
);
}
self.data.try_read_untracked()
}
}
impl<T: 'static> ToAnySource for LocalResource<T>
where
T: Send + Sync + 'static,
{
fn to_any_source(&self) -> AnySource {
self.data.to_any_source()
}
}
impl<T: 'static> ToAnySubscriber for LocalResource<T>
where
T: Send + Sync + 'static,
{
fn to_any_subscriber(&self) -> AnySubscriber {
self.data.to_any_subscriber()
}
}
impl<T> Source for LocalResource<T>
where
T: Send + Sync + 'static,
{
fn add_subscriber(&self, subscriber: AnySubscriber) {
self.data.add_subscriber(subscriber)
}
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
self.data.remove_subscriber(subscriber);
}
fn clear_subscribers(&self) {
self.data.clear_subscribers();
}
}
impl<T> ReactiveNode for LocalResource<T>
where
T: Send + Sync + 'static,
{
fn mark_dirty(&self) {
self.data.mark_dirty();
}
fn mark_check(&self) {
self.data.mark_check();
}
fn mark_subscribers_check(&self) {
self.data.mark_subscribers_check();
}
fn update_if_necessary(&self) -> bool {
self.data.update_if_necessary()
}
}
impl<T> Subscriber for LocalResource<T>
where
T: Send + Sync + 'static,
{
fn add_source(&self, source: AnySource) {
self.data.add_source(source);
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
self.data.clear_sources(subscriber);
}
}

View file

@ -41,7 +41,7 @@ impl<T: Send + Sync + 'static> AsyncDerived<T> {
pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Self {
#[cfg(debug_assertions)]

View file

@ -81,7 +81,27 @@ pub mod suspense {
signal::ArcRwSignal,
traits::{Update, Writeable},
};
use futures::channel::oneshot::Sender;
use or_poisoned::OrPoisoned;
use slotmap::{DefaultKey, SlotMap};
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct LocalResourceNotifier(Arc<Mutex<Option<Sender<()>>>>);
impl LocalResourceNotifier {
pub fn notify(&mut self) {
if let Some(tx) = self.0.lock().or_poisoned().take() {
tx.send(()).unwrap();
}
}
}
impl From<Sender<()>> for LocalResourceNotifier {
fn from(value: Sender<()>) -> Self {
Self(Arc::new(Mutex::new(Some(value))))
}
}
#[derive(Clone, Debug)]
pub struct SuspenseContext {

View file

@ -138,7 +138,7 @@ impl StreamBuilder {
pub fn push_async_out_of_order<View, Rndr>(
&mut self,
view: impl Future<Output = View> + Send + 'static,
view: impl Future<Output = Option<View>> + Send + 'static,
position: &mut Position,
) where
View: RenderHtml<Rndr>,
@ -160,18 +160,24 @@ impl StreamBuilder {
write!(&mut id, "{}-", piece).unwrap();
}
}
if let Some(id) = subbuilder.id.as_mut() {
id.push(0);
}
view.to_html_async_with_buf::<true>(
&mut subbuilder,
&mut position,
true,
);
let replace = view.is_some();
if let Some(view) = view {
view.to_html_async_with_buf::<true>(
&mut subbuilder,
&mut position,
true,
);
}
let chunks = subbuilder.finish().take_chunks();
OooChunk { id, chunks }
OooChunk {
id,
chunks,
replace,
}
}),
});
}
@ -201,6 +207,7 @@ pub enum StreamChunk {
struct OooChunk {
id: String,
chunks: VecDeque<StreamChunk>,
replace: bool,
}
impl OooChunk {
@ -211,7 +218,7 @@ impl OooChunk {
buf.push_str("\">");
}
pub fn push_end(id: &str, buf: &mut String) {
pub fn push_end(replace: bool, id: &str, buf: &mut String) {
buf.push_str("</template>");
// TODO nonce
@ -219,20 +226,26 @@ impl OooChunk {
buf.push_str(r#">(function() { let id = ""#);
buf.push_str(id);
buf.push_str(
"\";let open = undefined;let close = undefined;let walker \
= document.createTreeWalker(document.body, \
NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \
{if(walker.currentNode.textContent == `s-${id}o`){ \
open=walker.currentNode; } else \
if(walker.currentNode.textContent == `s-${id}c`) { close \
= walker.currentNode;}}let range = new Range(); \
range.setStartBefore(open); range.setEndBefore(close); \
range.deleteContents(); let tpl = \
document.getElementById(`${id}f`); \
close.parentNode.insertBefore(tpl.content.\
cloneNode(true), close);close.remove();})()",
"\";let open = undefined;let close = undefined;let walker = \
document.createTreeWalker(document.body, \
NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \
{if(walker.currentNode.textContent == `s-${id}o`){ \
open=walker.currentNode; } else \
if(walker.currentNode.textContent == `s-${id}c`) { close = \
walker.currentNode;}}let range = new Range(); \
range.setStartBefore(open); range.setEndBefore(close);",
);
buf.push_str("</script>");
if replace {
buf.push_str(
"range.deleteContents(); let tpl = \
document.getElementById(`${id}f`); \
close.parentNode.insertBefore(tpl.content.cloneNode(true), \
close);close.remove();",
);
} else {
buf.push_str("close.remove();open.remove();");
}
buf.push_str("})()</script>");
}
}
@ -279,7 +292,11 @@ impl Stream for StreamBuilder {
// now, handle out-of-order chunks
if let Some(mut pending) = this.pending_ooo.pop_front() {
match pending.as_mut().poll(cx) {
Poll::Ready(OooChunk { id, chunks }) => {
Poll::Ready(OooChunk {
id,
chunks,
replace,
}) => {
let opening = format!("<!--s-{id}o-->");
let placeholder_at =
this.sync_buf.find(&opening);
@ -324,7 +341,11 @@ impl Stream for StreamBuilder {
this.chunks.push_front(chunk);
}
}
OooChunk::push_end(&id, &mut this.sync_buf);
OooChunk::push_end(
replace,
&id,
&mut this.sync_buf,
);
}
self.poll_next(cx)
}