From dc21e4ff53328107b2dfa5c0a2e960885e58aa44 Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Thu, 4 Jul 2024 15:39:42 -0400 Subject: [PATCH] feat: local resources with .await --- examples/suspense_tests/src/app.rs | 205 +++++++---- hydration_context/src/csr.rs | 8 + hydration_context/src/hydrate.rs | 22 ++ hydration_context/src/lib.rs | 7 + hydration_context/src/ssr.rs | 30 +- leptos/src/suspense_component.rs | 132 +++++-- leptos/src/transition.rs | 22 +- leptos_server/src/lib.rs | 2 + leptos_server/src/local_resource.rs | 334 ++++++++++++++++++ .../computed/async_derived/async_derived.rs | 2 +- .../src/computed/async_derived/mod.rs | 20 ++ tachys/src/ssr/mod.rs | 69 ++-- 12 files changed, 723 insertions(+), 130 deletions(-) create mode 100644 leptos_server/src/local_resource.rs diff --git a/examples/suspense_tests/src/app.rs b/examples/suspense_tests/src/app.rs index a0467535c..6ee2b09fd 100644 --- a/examples/suspense_tests/src/app.rs +++ b/examples/suspense_tests/src/app.rs @@ -51,51 +51,63 @@ pub fn App() -> impl IntoView { // out-of-order -

"Out-of-Order"

- + view=|| { + view! { + +

"Out-of-Order"

+ + } } > + +
// in-order -

"In-Order"

- + view=|| { + view! { + +

"In-Order"

+ + } } > + +
// async -

"Async"

- + view=|| { + view! { + +

"Async"

+ + } } > + +
@@ -108,11 +120,16 @@ pub fn App() -> impl IntoView { fn SecondaryNav() -> impl IntoView { view! { } @@ -126,21 +143,28 @@ fn Nested() -> impl IntoView { view! {
- + {move || { - one_second.get().map(|_| view! { -

"One Second: Loaded 1!"

- }) + one_second.get().map(|_| view! {

"One Second: Loaded 1!"

}) }} - + {move || { - two_second.get().map(|_| view! { -

"Two Second: Loaded 2!"

- - }) + two_second + .get() + .map(|_| { + view! { +

"Two Second: Loaded 2!"

+ + } + }) }} +
@@ -154,25 +178,27 @@ fn NestedResourceInside() -> impl IntoView { view! {
- + {Suspend(async move { _ = one_second.await; - let two_second = Resource::new(|| (), move |_| async move { - second_wait_fn(WAIT_TWO_SECONDS).await - }); + let two_second = Resource::new( + || (), + move |_| async move { second_wait_fn(WAIT_TWO_SECONDS).await }, + ); view! {

"One Second: Loaded 1!"

"Loaded 2 (created inside first suspense)!: " - {Suspend(async move { format!("{:?}", two_second.await)})} + {Suspend(async move { format!("{:?}", two_second.await) })} - + } })} +
} @@ -186,25 +212,42 @@ fn Parallel() -> impl IntoView { view! {
- + {move || { - one_second.get().map(move |_| view! { -

"One Second: Loaded 1!"

- - }) + one_second + .get() + .map(move |_| { + view! { +

"One Second: Loaded 1!"

+ + } + }) }} +
- + {move || { - two_second.get().map(move |_| view! { -

"Two Second: Loaded 2!"

- - }) + two_second + .get() + .map(move |_| { + view! { +

"Two Second: Loaded 2!"

+ + } + }) }} +
} @@ -217,18 +260,17 @@ fn Single() -> impl IntoView { view! {
- - {move || { - one_second.get().map(|_| view! { -

"One Second: Loaded 1!"

- }) - }} + + {move || { + one_second.get().map(|_| view! {

"One Second: Loaded 1!"

}) + }} +

"Children following Suspense should hydrate properly."

- +
} @@ -244,9 +286,7 @@ fn InsideComponent() -> impl IntoView {

"Children following Suspense should hydrate properly."

- +
} @@ -256,16 +296,47 @@ fn InsideComponent() -> impl IntoView { fn InsideComponentChild() -> impl IntoView { let one_second = Resource::new(|| WAIT_ONE_SECOND, first_wait_fn); view! { - - {move || { - one_second.get().map(|_| view! { -

"One Second: Loaded 1!"

- }) - }} + + {move || { + one_second.get().map(|_| view! {

"One Second: Loaded 1!"

}) + }} +
} } +#[component] +fn LocalResource() -> impl IntoView { + let one_second = Resource::new(|| WAIT_ONE_SECOND, first_wait_fn); + let local = LocalResource::new(|| first_wait_fn(WAIT_ONE_SECOND)); + let (count, set_count) = signal(0); + + view! { +
+ + {move || { + one_second.get().map(|_| view! {

"One Second: Loaded 1!"

}) + }} + {move || { + Suspend(async move { + let value = local.await; + view! {

"One Second: Local Loaded " {value} "!"

} + }) + }} + +
+

"Children following Suspense should hydrate properly."

+
+ +
+
+ } +} + #[component] fn None() -> impl IntoView { let (count, set_count) = signal(0); @@ -274,9 +345,7 @@ fn None() -> impl IntoView {

"Children inside Suspense should hydrate properly."

- +

"Children following Suspense should hydrate properly."

diff --git a/hydration_context/src/csr.rs b/hydration_context/src/csr.rs index b914bc626..e36f24300 100644 --- a/hydration_context/src/csr.rs +++ b/hydration_context/src/csr.rs @@ -84,4 +84,12 @@ impl SharedContext for CsrSharedContext { fn await_deferred(&self) -> Option> { None } + + #[inline(always)] + fn set_incomplete_chunk(&self, _id: SerializedDataId) {} + + #[inline(always)] + fn get_incomplete_chunk(&self, _id: &SerializedDataId) -> bool { + false + } } diff --git a/hydration_context/src/hydrate.rs b/hydration_context/src/hydrate.rs index 9195b1886..08cc6818f 100644 --- a/hydration_context/src/hydrate.rs +++ b/hydration_context/src/hydrate.rs @@ -15,6 +15,8 @@ extern "C" { static __RESOLVED_RESOURCES: Array; static __SERIALIZED_ERRORS: Array; + + static __INCOMPLETE_CHUNKS: Array; } fn serialized_errors() -> Vec<(SerializedDataId, ErrorId, Error)> { @@ -38,6 +40,16 @@ fn serialized_errors() -> Vec<(SerializedDataId, ErrorId, Error)> { .collect() } +fn incomplete_chunks() -> Vec { + __INCOMPLETE_CHUNKS + .iter() + .map(|value| { + let id = value.as_f64().unwrap() as usize; + SerializedDataId(id) + }) + .collect() +} + /// An error that has been serialized across the network boundary. #[derive(Debug, Clone)] struct SerializedError(String); @@ -57,6 +69,7 @@ pub struct HydrateSharedContext { is_hydrating: AtomicBool, during_hydration: AtomicBool, errors: Lazy>, + incomplete: Lazy>, } impl HydrateSharedContext { @@ -67,6 +80,7 @@ impl HydrateSharedContext { is_hydrating: AtomicBool::new(true), during_hydration: AtomicBool::new(true), errors: Lazy::new(serialized_errors), + incomplete: Lazy::new(incomplete_chunks), } } @@ -80,6 +94,7 @@ impl HydrateSharedContext { is_hydrating: AtomicBool::new(false), during_hydration: AtomicBool::new(true), errors: Lazy::new(serialized_errors), + incomplete: Lazy::new(incomplete_chunks), } } } @@ -166,4 +181,11 @@ impl SharedContext for HydrateSharedContext { fn await_deferred(&self) -> Option> { None } + + #[inline(always)] + fn set_incomplete_chunk(&self, _id: SerializedDataId) {} + + fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool { + self.incomplete.iter().any(|entry| entry == id) + } } diff --git a/hydration_context/src/lib.rs b/hydration_context/src/lib.rs index fa134434d..6d5a48a8d 100644 --- a/hydration_context/src/lib.rs +++ b/hydration_context/src/lib.rs @@ -140,4 +140,11 @@ pub trait SharedContext: Debug { /// /// In browser implementations, this should be a no-op. fn await_deferred(&self) -> Option>; + + /// Tells the client that this chunk is being sent from the server before all its data have + /// loaded, and it may be in a fallback state. + fn set_incomplete_chunk(&self, id: SerializedDataId); + + /// Checks whether this chunk is being sent from the server before all its data have loaded. + fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool; } diff --git a/hydration_context/src/ssr.rs b/hydration_context/src/ssr.rs index b36afcbe1..8a25f2908 100644 --- a/hydration_context/src/ssr.rs +++ b/hydration_context/src/ssr.rs @@ -2,7 +2,7 @@ use super::{SerializedDataId, SharedContext}; use crate::{PinnedFuture, PinnedStream}; use futures::{ future::join_all, - stream::{self}, + stream::{self, once}, Stream, StreamExt, }; use or_poisoned::OrPoisoned; @@ -33,6 +33,7 @@ pub struct SsrSharedContext { errors: ErrorBuf, sealed_error_boundaries: SealedErrors, deferred: Mutex>>, + incomplete: Arc>>, } impl SsrSharedContext { @@ -178,8 +179,19 @@ impl SharedContext for SsrSharedContext { sealed_error_boundaries: Arc::clone(&self.sealed_error_boundaries), }; - let stream = - stream::once(async move { initial_chunk }).chain(async_data); + let incomplete = Arc::clone(&self.incomplete); + + let stream = stream::once(async move { initial_chunk }) + .chain(async_data) + .chain(once(async move { + let mut script = String::new(); + script.push_str("__INCOMPLETE_CHUNKS=["); + for chunk in mem::take(&mut *incomplete.lock().or_poisoned()) { + _ = write!(script, "{},", chunk.0); + } + script.push_str("];"); + script + })); Some(Box::pin(stream)) } @@ -203,6 +215,18 @@ impl SharedContext for SsrSharedContext { })) } } + + fn set_incomplete_chunk(&self, id: SerializedDataId) { + self.incomplete.lock().or_poisoned().push(id); + } + + fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool { + self.incomplete + .lock() + .or_poisoned() + .iter() + .any(|entry| entry == id) + } } struct AsyncDataStream { diff --git a/leptos/src/suspense_component.rs b/leptos/src/suspense_component.rs index 4a16b0afa..933a0e969 100644 --- a/leptos/src/suspense_component.rs +++ b/leptos/src/suspense_component.rs @@ -2,14 +2,18 @@ use crate::{ children::{TypedChildren, ViewFnOnce}, IntoView, }; -use futures::FutureExt; +use futures::{select, FutureExt, StreamExt}; +use hydration_context::SerializedDataId; use leptos_macro::component; use reactive_graph::{ - computed::{suspense::SuspenseContext, ArcMemo, ScopedFuture}, + computed::{ + suspense::{LocalResourceNotifier, SuspenseContext}, + ArcMemo, ScopedFuture, + }, effect::RenderEffect, owner::{provide_context, use_context, Owner}, signal::ArcRwSignal, - traits::{Get, Read, Track, With}, + traits::{Get, GetUntracked, Read, Track, With}, }; use slotmap::{DefaultKey, SlotMap}; use tachys::{ @@ -36,15 +40,31 @@ pub fn Suspense( where Chil: IntoView + Send + 'static, { + let (starts_local, id) = { + Owner::current_shared_context() + .map(|sc| { + let id = sc.next_id(); + (sc.get_incomplete_chunk(&id), id) + }) + .unwrap_or_else(|| (false, Default::default())) + }; let fallback = fallback.run(); let children = children.into_inner()(); let tasks = ArcRwSignal::new(SlotMap::::new()); provide_context(SuspenseContext { tasks: tasks.clone(), }); - let none_pending = ArcMemo::new(move |_| tasks.with(SlotMap::is_empty)); + let none_pending = ArcMemo::new(move |prev: Option<&bool>| { + tasks.track(); + if prev.is_none() && starts_local { + false + } else { + tasks.with(SlotMap::is_empty) + } + }); OwnedView::new(SuspenseBoundary:: { + id, none_pending, fallback, children, @@ -52,6 +72,7 @@ where } pub(crate) struct SuspenseBoundary { + pub id: SerializedDataId, pub none_pending: ArcMemo, pub fallback: Fal, pub children: Chil, @@ -130,11 +151,13 @@ where { let attr = attr.into_cloneable_owned(); let SuspenseBoundary { + id, none_pending, fallback, children, } = self; SuspenseBoundary { + id, none_pending, fallback, children: children.add_any_attr(attr), @@ -183,15 +206,21 @@ where let owner = Owner::current().unwrap(); - let tasks = suspense_context.tasks.clone(); - let (tx, rx) = futures::channel::oneshot::channel::<()>(); + // we need to wait for one of two things: either + // 1. all tasks are finished loading, or + // 2. we read from a local resource, meaning this Suspense can never resolve on the server - let mut tx = Some(tx); + // first, create listener for tasks + let tasks = suspense_context.tasks.clone(); + let (tasks_tx, mut tasks_rx) = + futures::channel::oneshot::channel::<()>(); + + let mut tasks_tx = Some(tasks_tx); let eff = reactive_graph::effect::RenderEffect::new_isomorphic({ move |_| { tasks.track(); if tasks.read().is_empty() { - if let Some(tx) = tx.take() { + if let Some(tx) = tasks_tx.take() { // If the receiver has dropped, it means the ScopedFuture has already // dropped, so it doesn't matter if we manage to send this. _ = tx.send(()); @@ -200,37 +229,75 @@ where } }); + // now, create listener for local resources + let (local_tx, mut local_rx) = + futures::channel::oneshot::channel::<()>(); + provide_context(LocalResourceNotifier::from(local_tx)); + // walk over the tree of children once to make sure that all resource loads are registered self.children.dry_resolve(); - let mut fut = - Box::pin(ScopedFuture::new(ErrorHookFuture::new(async move { - // wait for all the resources to have loaded before trying to resolve the body + let mut fut = Box::pin(ScopedFuture::new(ErrorHookFuture::new( + async move { + // race the local resource notifier against the set of tasks + // + // if there are local resources, we just return the fallback immediately + // + // otherwise, we want to wait for resources to load before trying to resolve the body + // // this is *less efficient* than just resolving the body // however, it means that you can use reactive accesses to resources/async derived // inside component props, at any level, and have those picked up by Suspense, and // that it will wait for those to resolve - _ = rx.await; + select! { + // if there are local resources, bail + // this will only have fired by this point for local resources accessed + // *synchronously* + _ = local_rx => { + let sc = Owner::current_shared_context().expect("no shared context"); + sc.set_incomplete_chunk(self.id); + None + } + _ = tasks_rx => { + // if we ran this earlier, reactive reads would always be registered as None + // this is fine in the case where we want to use Suspend and .await on some future + // but in situations like a we actually + // want to be able to 1) synchronously read a resource's value, but still 2) wait + // for it to load before we render everything + let mut children = Box::pin(self.children.resolve().fuse()); - // if we ran this earlier, reactive reads would always be registered as None - // this is fine in the case where we want to use Suspend and .await on some future - // but in situations like a we actually - // want to be able to 1) synchronously read a resource's value, but still 2) wait - // for it to load before we render everything - let children = self.children.resolve().await; + // we continue racing the children against the "do we have any local + // resources?" Future + select! { + _ = local_rx => { + let sc = Owner::current_shared_context().expect("no shared context"); + sc.set_incomplete_chunk(self.id); + None + } + children = children => { + // clean up the (now useless) effect + drop(eff); - // clean up the (now useless) effect - drop(eff); - - OwnedView::new_with_owner(children, owner) - }))); + Some(OwnedView::new_with_owner(children, owner)) + } + } + } + } + }, + ))); match fut.as_mut().now_or_never() { - Some(resolved) => { + Some(Some(resolved)) => { Either::::Right(resolved) .to_html_async_with_buf::( buf, position, escape, ); } + Some(None) => { + Either::<_, Chil>::Left(self.fallback) + .to_html_async_with_buf::( + buf, position, escape, + ); + } None => { let id = buf.clone_id(); @@ -244,14 +311,16 @@ where buf.push_async({ let mut position = *position; async move { - let value = fut.await; + let value = match fut.await { + None => Either::Left(self.fallback), + Some(value) => Either::Right(value), + }; let mut builder = StreamBuilder::new(id); - Either::::Right(value) - .to_html_async_with_buf::( - &mut builder, - &mut position, - escape, - ); + value.to_html_async_with_buf::( + &mut builder, + &mut position, + escape, + ); builder.finish().take_chunks() } }); @@ -268,6 +337,7 @@ where ) -> Self::State { let cursor = cursor.to_owned(); let position = position.to_owned(); + let mut children = Some(self.children); let mut fallback = Some(self.fallback); let none_pending = self.none_pending; diff --git a/leptos/src/transition.rs b/leptos/src/transition.rs index 125b3471c..06d5117bb 100644 --- a/leptos/src/transition.rs +++ b/leptos/src/transition.rs @@ -7,9 +7,9 @@ use leptos_macro::component; use reactive_graph::{ computed::{suspense::SuspenseContext, ArcMemo}, effect::Effect, - owner::provide_context, + owner::{provide_context, Owner}, signal::ArcRwSignal, - traits::{Get, With}, + traits::{Get, Track, With}, wrappers::write::SignalSetter, }; use slotmap::{DefaultKey, SlotMap}; @@ -31,13 +31,28 @@ pub fn Transition( where Chil: IntoView + Send + 'static, { + let (starts_local, id) = { + Owner::current_shared_context() + .map(|sc| { + let id = sc.next_id(); + (sc.get_incomplete_chunk(&id), id) + }) + .unwrap_or_else(|| (false, Default::default())) + }; let fallback = fallback.run(); let children = children.into_inner()(); let tasks = ArcRwSignal::new(SlotMap::::new()); provide_context(SuspenseContext { tasks: tasks.clone(), }); - let none_pending = ArcMemo::new(move |_| tasks.with(SlotMap::is_empty)); + let none_pending = ArcMemo::new(move |prev: Option<&bool>| { + tasks.track(); + if prev.is_none() && starts_local { + false + } else { + tasks.with(SlotMap::is_empty) + } + }); if let Some(set_pending) = set_pending { Effect::new_isomorphic({ let none_pending = none_pending.clone(); @@ -48,6 +63,7 @@ where } OwnedView::new(SuspenseBoundary:: { + id, none_pending, fallback, children, diff --git a/leptos_server/src/lib.rs b/leptos_server/src/lib.rs index 00951f54a..b02e2d749 100644 --- a/leptos_server/src/lib.rs +++ b/leptos_server/src/lib.rs @@ -3,6 +3,8 @@ mod action; pub use action::*; +mod local_resource; +pub use local_resource::*; mod multi_action; pub use multi_action::*; mod resource; diff --git a/leptos_server/src/local_resource.rs b/leptos_server/src/local_resource.rs new file mode 100644 index 000000000..175b33876 --- /dev/null +++ b/leptos_server/src/local_resource.rs @@ -0,0 +1,334 @@ +use reactive_graph::{ + computed::{ + suspense::{LocalResourceNotifier, SuspenseContext}, + ArcAsyncDerived, AsyncDerived, AsyncDerivedFuture, + }, + graph::{ + AnySource, AnySubscriber, ReactiveNode, Source, Subscriber, + ToAnySource, ToAnySubscriber, + }, + owner::use_context, + signal::guards::{AsyncPlain, ReadGuard}, + traits::{DefinedAt, ReadUntracked}, +}; +use std::{ + future::{pending, Future, IntoFuture}, + panic::Location, +}; + +pub struct ArcLocalResource { + data: ArcAsyncDerived, + #[cfg(debug_assertions)] + defined_at: &'static Location<'static>, +} + +impl Clone for ArcLocalResource { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + #[cfg(debug_assertions)] + defined_at: self.defined_at, + } + } +} + +impl ArcLocalResource { + #[track_caller] + pub fn new(fetcher: impl Fn() -> Fut + Send + Sync + 'static) -> Self + where + T: Send + Sync + 'static, + Fut: Future + Send + 'static, + { + let fetcher = move || { + let fut = fetcher(); + async move { + // in SSR mode, this will simply always be pending + // if we try to read from it, we will trigger Suspense automatically to fall back + // so this will never need to return anything + if cfg!(feature = "ssr") { + pending().await + } else { + fut.await + } + } + }; + Self { + data: ArcAsyncDerived::new(fetcher), + #[cfg(debug_assertions)] + defined_at: Location::caller(), + } + } +} + +impl IntoFuture for ArcLocalResource +where + T: Clone + 'static, +{ + type Output = T; + type IntoFuture = AsyncDerivedFuture; + + fn into_future(self) -> Self::IntoFuture { + if let Some(mut notifier) = use_context::() { + notifier.notify(); + } else if cfg!(feature = "ssr") { + panic!( + "Reading from a LocalResource outside Suspense in `ssr` mode \ + will cause the response to hang, because LocalResources are \ + always pending on the server." + ); + } + self.data.into_future() + } +} + +impl DefinedAt for ArcLocalResource { + fn defined_at(&self) -> Option<&'static Location<'static>> { + #[cfg(debug_assertions)] + { + Some(self.defined_at) + } + #[cfg(not(debug_assertions))] + { + None + } + } +} + +impl ReadUntracked for ArcLocalResource +where + T: Send + Sync + 'static, +{ + type Value = ReadGuard, AsyncPlain>>; + + fn try_read_untracked(&self) -> Option { + if let Some(mut notifier) = use_context::() { + notifier.notify(); + } else if cfg!(feature = "ssr") { + panic!( + "Reading from a LocalResource outside Suspense in `ssr` mode \ + will cause the response to hang, because LocalResources are \ + always pending on the server." + ); + } + self.data.try_read_untracked() + } +} + +impl ToAnySource for ArcLocalResource { + fn to_any_source(&self) -> AnySource { + self.data.to_any_source() + } +} + +impl ToAnySubscriber for ArcLocalResource { + fn to_any_subscriber(&self) -> AnySubscriber { + self.data.to_any_subscriber() + } +} + +impl Source for ArcLocalResource { + fn add_subscriber(&self, subscriber: AnySubscriber) { + self.data.add_subscriber(subscriber) + } + + fn remove_subscriber(&self, subscriber: &AnySubscriber) { + self.data.remove_subscriber(subscriber); + } + + fn clear_subscribers(&self) { + self.data.clear_subscribers(); + } +} + +impl ReactiveNode for ArcLocalResource { + fn mark_dirty(&self) { + self.data.mark_dirty(); + } + + fn mark_check(&self) { + self.data.mark_check(); + } + + fn mark_subscribers_check(&self) { + self.data.mark_subscribers_check(); + } + + fn update_if_necessary(&self) -> bool { + self.data.update_if_necessary() + } +} + +impl Subscriber for ArcLocalResource { + fn add_source(&self, source: AnySource) { + self.data.add_source(source); + } + + fn clear_sources(&self, subscriber: &AnySubscriber) { + self.data.clear_sources(subscriber); + } +} + +pub struct LocalResource { + data: AsyncDerived, + #[cfg(debug_assertions)] + defined_at: &'static Location<'static>, +} + +impl Clone for LocalResource { + fn clone(&self) -> Self { + *self + } +} + +impl Copy for LocalResource {} + +impl LocalResource { + #[track_caller] + pub fn new(fetcher: impl Fn() -> Fut + Send + Sync + 'static) -> Self + where + T: Send + Sync + 'static, + Fut: Future + Send + 'static, + { + let fetcher = move || { + let fut = fetcher(); + async move { + // in SSR mode, this will simply always be pending + // if we try to read from it, we will trigger Suspense automatically to fall back + // so this will never need to return anything + if cfg!(feature = "ssr") { + pending().await + } else { + fut.await + } + } + }; + Self { + data: AsyncDerived::new(fetcher), + #[cfg(debug_assertions)] + defined_at: Location::caller(), + } + } +} + +impl IntoFuture for LocalResource +where + T: Clone + 'static, +{ + type Output = T; + type IntoFuture = AsyncDerivedFuture; + + fn into_future(self) -> Self::IntoFuture { + if let Some(mut notifier) = use_context::() { + notifier.notify(); + } else if cfg!(feature = "ssr") { + panic!( + "Reading from a LocalResource outside Suspense in `ssr` mode \ + will cause the response to hang, because LocalResources are \ + always pending on the server." + ); + } + self.data.into_future() + } +} + +impl DefinedAt for LocalResource { + fn defined_at(&self) -> Option<&'static Location<'static>> { + #[cfg(debug_assertions)] + { + Some(self.defined_at) + } + #[cfg(not(debug_assertions))] + { + None + } + } +} + +impl ReadUntracked for LocalResource +where + T: Send + Sync + 'static, +{ + type Value = ReadGuard, AsyncPlain>>; + + fn try_read_untracked(&self) -> Option { + if let Some(mut notifier) = use_context::() { + notifier.notify(); + } else if cfg!(feature = "ssr") { + panic!( + "Reading from a LocalResource outside Suspense in `ssr` mode \ + will cause the response to hang, because LocalResources are \ + always pending on the server." + ); + } + self.data.try_read_untracked() + } +} + +impl ToAnySource for LocalResource +where + T: Send + Sync + 'static, +{ + fn to_any_source(&self) -> AnySource { + self.data.to_any_source() + } +} + +impl ToAnySubscriber for LocalResource +where + T: Send + Sync + 'static, +{ + fn to_any_subscriber(&self) -> AnySubscriber { + self.data.to_any_subscriber() + } +} + +impl Source for LocalResource +where + T: Send + Sync + 'static, +{ + fn add_subscriber(&self, subscriber: AnySubscriber) { + self.data.add_subscriber(subscriber) + } + + fn remove_subscriber(&self, subscriber: &AnySubscriber) { + self.data.remove_subscriber(subscriber); + } + + fn clear_subscribers(&self) { + self.data.clear_subscribers(); + } +} + +impl ReactiveNode for LocalResource +where + T: Send + Sync + 'static, +{ + fn mark_dirty(&self) { + self.data.mark_dirty(); + } + + fn mark_check(&self) { + self.data.mark_check(); + } + + fn mark_subscribers_check(&self) { + self.data.mark_subscribers_check(); + } + + fn update_if_necessary(&self) -> bool { + self.data.update_if_necessary() + } +} + +impl Subscriber for LocalResource +where + T: Send + Sync + 'static, +{ + fn add_source(&self, source: AnySource) { + self.data.add_source(source); + } + + fn clear_sources(&self, subscriber: &AnySubscriber) { + self.data.clear_sources(subscriber); + } +} diff --git a/reactive_graph/src/computed/async_derived/async_derived.rs b/reactive_graph/src/computed/async_derived/async_derived.rs index 7f05d3ca7..bcfffd88f 100644 --- a/reactive_graph/src/computed/async_derived/async_derived.rs +++ b/reactive_graph/src/computed/async_derived/async_derived.rs @@ -41,7 +41,7 @@ impl AsyncDerived { pub fn new(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self where T: Send + Sync + 'static, - Fut: Future + Send + Sync + 'static, + Fut: Future + Send + 'static, { Self { #[cfg(debug_assertions)] diff --git a/reactive_graph/src/computed/async_derived/mod.rs b/reactive_graph/src/computed/async_derived/mod.rs index d64224a96..b32a768c5 100644 --- a/reactive_graph/src/computed/async_derived/mod.rs +++ b/reactive_graph/src/computed/async_derived/mod.rs @@ -81,7 +81,27 @@ pub mod suspense { signal::ArcRwSignal, traits::{Update, Writeable}, }; + use futures::channel::oneshot::Sender; + use or_poisoned::OrPoisoned; use slotmap::{DefaultKey, SlotMap}; + use std::sync::{Arc, Mutex}; + + #[derive(Clone, Debug)] + pub struct LocalResourceNotifier(Arc>>>); + + impl LocalResourceNotifier { + pub fn notify(&mut self) { + if let Some(tx) = self.0.lock().or_poisoned().take() { + tx.send(()).unwrap(); + } + } + } + + impl From> for LocalResourceNotifier { + fn from(value: Sender<()>) -> Self { + Self(Arc::new(Mutex::new(Some(value)))) + } + } #[derive(Clone, Debug)] pub struct SuspenseContext { diff --git a/tachys/src/ssr/mod.rs b/tachys/src/ssr/mod.rs index ed010c0f2..57bc86812 100644 --- a/tachys/src/ssr/mod.rs +++ b/tachys/src/ssr/mod.rs @@ -138,7 +138,7 @@ impl StreamBuilder { pub fn push_async_out_of_order( &mut self, - view: impl Future + Send + 'static, + view: impl Future> + Send + 'static, position: &mut Position, ) where View: RenderHtml, @@ -160,18 +160,24 @@ impl StreamBuilder { write!(&mut id, "{}-", piece).unwrap(); } } - if let Some(id) = subbuilder.id.as_mut() { id.push(0); } - view.to_html_async_with_buf::( - &mut subbuilder, - &mut position, - true, - ); + let replace = view.is_some(); + if let Some(view) = view { + view.to_html_async_with_buf::( + &mut subbuilder, + &mut position, + true, + ); + } let chunks = subbuilder.finish().take_chunks(); - OooChunk { id, chunks } + OooChunk { + id, + chunks, + replace, + } }), }); } @@ -201,6 +207,7 @@ pub enum StreamChunk { struct OooChunk { id: String, chunks: VecDeque, + replace: bool, } impl OooChunk { @@ -211,7 +218,7 @@ impl OooChunk { buf.push_str("\">"); } - pub fn push_end(id: &str, buf: &mut String) { + pub fn push_end(replace: bool, id: &str, buf: &mut String) { buf.push_str(""); // TODO nonce @@ -219,20 +226,26 @@ impl OooChunk { buf.push_str(r#">(function() { let id = ""#); buf.push_str(id); buf.push_str( - "\";let open = undefined;let close = undefined;let walker \ - = document.createTreeWalker(document.body, \ - NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \ - {if(walker.currentNode.textContent == `s-${id}o`){ \ - open=walker.currentNode; } else \ - if(walker.currentNode.textContent == `s-${id}c`) { close \ - = walker.currentNode;}}let range = new Range(); \ - range.setStartBefore(open); range.setEndBefore(close); \ - range.deleteContents(); let tpl = \ - document.getElementById(`${id}f`); \ - close.parentNode.insertBefore(tpl.content.\ - cloneNode(true), close);close.remove();})()", + "\";let open = undefined;let close = undefined;let walker = \ + document.createTreeWalker(document.body, \ + NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \ + {if(walker.currentNode.textContent == `s-${id}o`){ \ + open=walker.currentNode; } else \ + if(walker.currentNode.textContent == `s-${id}c`) { close = \ + walker.currentNode;}}let range = new Range(); \ + range.setStartBefore(open); range.setEndBefore(close);", ); - buf.push_str(""); + if replace { + buf.push_str( + "range.deleteContents(); let tpl = \ + document.getElementById(`${id}f`); \ + close.parentNode.insertBefore(tpl.content.cloneNode(true), \ + close);close.remove();", + ); + } else { + buf.push_str("close.remove();open.remove();"); + } + buf.push_str("})()"); } } @@ -279,7 +292,11 @@ impl Stream for StreamBuilder { // now, handle out-of-order chunks if let Some(mut pending) = this.pending_ooo.pop_front() { match pending.as_mut().poll(cx) { - Poll::Ready(OooChunk { id, chunks }) => { + Poll::Ready(OooChunk { + id, + chunks, + replace, + }) => { let opening = format!(""); let placeholder_at = this.sync_buf.find(&opening); @@ -324,7 +341,11 @@ impl Stream for StreamBuilder { this.chunks.push_front(chunk); } } - OooChunk::push_end(&id, &mut this.sync_buf); + OooChunk::push_end( + replace, + &id, + &mut this.sync_buf, + ); } self.poll_next(cx) }