fix: do not retrigger parent effect when Suspend's resources resolve (closes #2956)

This commit is contained in:
Greg Johnston 2024-09-09 18:10:48 -04:00
parent 5a57d48913
commit 7c0889e873
4 changed files with 135 additions and 48 deletions

View file

@ -719,7 +719,7 @@ where
}
fn to_html(self, style: &mut String) {
if let Some(inner) = self.now_or_never() {
if let Some(inner) = self.inner.now_or_never() {
inner.to_html(style);
} else {
panic!("You cannot use Suspend on an attribute outside Suspense");
@ -736,7 +736,8 @@ where
let state = Rc::clone(&state);
async move {
*state.borrow_mut() =
Some(self.await.hydrate::<FROM_SERVER>(&el));
Some(self.inner.await.hydrate::<FROM_SERVER>(&el));
self.subscriber.forward();
}
});
state
@ -748,7 +749,8 @@ where
Executor::spawn_local({
let state = Rc::clone(&state);
async move {
*state.borrow_mut() = Some(self.await.build(&el));
*state.borrow_mut() = Some(self.inner.await.build(&el));
self.subscriber.forward();
}
});
state
@ -758,11 +760,12 @@ where
Executor::spawn_local({
let state = Rc::clone(state);
async move {
let value = self.await;
let value = self.inner.await;
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
value.rebuild(state);
}
self.subscriber.forward();
}
});
}
@ -778,6 +781,6 @@ where
fn dry_resolve(&mut self) {}
async fn resolve(self) -> Self::AsyncOutput {
self.await
self.inner.await
}
}

View file

@ -401,7 +401,8 @@ where
let state = Rc::clone(&state);
async move {
*state.borrow_mut() =
Some(self.await.hydrate::<FROM_SERVER>(&key, &el));
Some(self.inner.await.hydrate::<FROM_SERVER>(&key, &el));
self.subscriber.forward();
}
});
state
@ -414,7 +415,8 @@ where
Executor::spawn_local({
let state = Rc::clone(&state);
async move {
*state.borrow_mut() = Some(self.await.build(&el, &key));
*state.borrow_mut() = Some(self.inner.await.build(&el, &key));
self.subscriber.forward();
}
});
state
@ -425,11 +427,12 @@ where
Executor::spawn_local({
let state = Rc::clone(state);
async move {
let value = self.await;
let value = self.inner.await;
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
value.rebuild(&key, state);
}
self.subscriber.forward();
}
});
}
@ -447,7 +450,7 @@ where
fn dry_resolve(&mut self) {}
async fn resolve(self) -> Self::AsyncOutput {
self.await
self.inner.await
}
}

View file

@ -447,7 +447,7 @@ where
type CloneableOwned = Self;
fn to_html(self, style: &mut String) {
if let Some(inner) = self.now_or_never() {
if let Some(inner) = self.inner.now_or_never() {
inner.to_html(style);
} else {
panic!("You cannot use Suspend on an attribute outside Suspense");
@ -464,7 +464,8 @@ where
let state = Rc::clone(&state);
async move {
*state.borrow_mut() =
Some(self.await.hydrate::<FROM_SERVER>(&el));
Some(self.inner.await.hydrate::<FROM_SERVER>(&el));
self.subscriber.forward();
}
});
state
@ -476,7 +477,8 @@ where
Executor::spawn_local({
let state = Rc::clone(&state);
async move {
*state.borrow_mut() = Some(self.await.build(&el));
*state.borrow_mut() = Some(self.inner.await.build(&el));
self.subscriber.forward();
}
});
state
@ -486,11 +488,12 @@ where
Executor::spawn_local({
let state = Rc::clone(state);
async move {
let value = self.await;
let value = self.inner.await;
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
value.rebuild(state);
}
self.subscriber.forward();
}
});
}
@ -506,6 +509,6 @@ where
fn dry_resolve(&mut self) {}
async fn resolve(self) -> Self::AsyncOutput {
self.await
self.inner.await
}
}

View file

@ -9,57 +9,123 @@ use crate::{
},
};
use any_spawner::Executor;
use futures::{select, FutureExt};
use futures::{channel::oneshot, select, FutureExt};
use or_poisoned::OrPoisoned;
use reactive_graph::{
computed::{
suspense::{LocalResourceNotifier, SuspenseContext},
ScopedFuture,
},
graph::{
AnySource, AnySubscriber, Observer, ReactiveNode, Source, Subscriber,
ToAnySubscriber, WithObserver,
},
owner::{provide_context, use_context},
};
use std::{
cell::RefCell,
fmt::Debug,
future::Future,
mem,
pin::Pin,
rc::Rc,
task::{Context, Poll},
sync::{Arc, Mutex, Weak},
};
/// A suspended `Future`, which can be used in the view.
#[derive(Clone)]
pub struct Suspend<Fut> {
inner: Pin<Box<ScopedFuture<Fut>>>,
pub(crate) subscriber: SuspendSubscriber,
pub(crate) inner: Pin<Box<ScopedFuture<Fut>>>,
}
#[derive(Debug, Clone)]
pub(crate) struct SuspendSubscriber {
inner: Arc<SuspendSubscriberInner>,
}
#[derive(Debug)]
struct SuspendSubscriberInner {
outer_subscriber: Option<AnySubscriber>,
sources: Mutex<Vec<AnySource>>,
cancel: Mutex<Option<oneshot::Sender<()>>>,
}
impl SuspendSubscriber {
pub fn new(cancel: oneshot::Sender<()>) -> Self {
let outer_subscriber = Observer::get();
Self {
inner: Arc::new(SuspendSubscriberInner {
outer_subscriber,
sources: Default::default(),
cancel: Mutex::new(Some(cancel)),
}),
}
}
/// Re-links all reactive sources from this to another subscriber.
///
/// This is used to collect reactive dependencies during the rendering phase, and only later
/// connect them to any outer effect, to prevent the completion of async resources from
/// triggering the render effect to run a second time.
pub fn forward(&self) {
if let Some(to) = &self.inner.outer_subscriber {
let sources =
mem::take(&mut *self.inner.sources.lock().or_poisoned());
for source in sources {
source.add_subscriber(to.clone());
to.add_source(source);
}
}
}
}
impl ReactiveNode for SuspendSubscriberInner {
fn mark_dirty(&self) {
if let Some(tx) = self.cancel.lock().or_poisoned().take() {
_ = tx.send(());
}
}
fn mark_check(&self) {}
fn mark_subscribers_check(&self) {}
fn update_if_necessary(&self) -> bool {
false
}
}
impl Subscriber for SuspendSubscriberInner {
fn add_source(&self, source: AnySource) {
self.sources.lock().or_poisoned().push(source);
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
for source in mem::take(&mut *self.sources.lock().or_poisoned()) {
source.remove_subscriber(subscriber);
}
}
}
impl ToAnySubscriber for SuspendSubscriber {
fn to_any_subscriber(&self) -> AnySubscriber {
AnySubscriber(
Arc::as_ptr(&self.inner) as usize,
Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
)
}
}
impl<Fut> Suspend<Fut> {
/// Creates a new suspended view.
pub fn new(fut: Fut) -> Self {
Self {
inner: Box::pin(ScopedFuture::new(fut)),
}
}
}
impl<Fut> Future for Suspend<Fut>
where
Fut: Future,
{
type Output = Fut::Output;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
impl<Fut> From<ScopedFuture<Fut>> for Suspend<Fut> {
fn from(inner: ScopedFuture<Fut>) -> Self {
Self {
inner: Box::pin(inner),
}
let (tx, rx) = oneshot::channel();
let subscriber = SuspendSubscriber::new(tx);
let any_subscriber = subscriber.to_any_subscriber();
let inner =
any_subscriber.with_observer(|| Box::pin(ScopedFuture::new(fut)));
Self { subscriber, inner }
}
}
@ -106,10 +172,12 @@ where
// TODO cancelation if it fires multiple times
fn build(self) -> Self::State {
let Self { subscriber, inner } = self;
// poll the future once immediately
// if it's already available, start in the ready state
// otherwise, start with the fallback
let mut fut = Box::pin(self);
let mut fut = Box::pin(inner);
let initial = fut.as_mut().now_or_never();
let initially_pending = initial.is_none();
let inner = Rc::new(RefCell::new(initial.build()));
@ -127,6 +195,8 @@ where
let value = fut.as_mut().await;
drop(id);
Some(value).rebuild(&mut *state.borrow_mut());
subscriber.forward();
}
});
}
@ -135,8 +205,10 @@ where
}
fn rebuild(self, state: &mut Self::State) {
let Self { subscriber, inner } = self;
// get a unique ID if there's a SuspenseContext
let fut = self;
let fut = inner;
let id = use_context::<SuspenseContext>().map(|sc| sc.task_id());
// spawn the future, and rebuild the state when it resolves
@ -150,6 +222,8 @@ where
// has no parent
any_spawner::Executor::tick().await;
Some(value).rebuild(&mut *state.borrow_mut());
subscriber.forward();
}
});
}
@ -208,7 +282,7 @@ where
// TODO wrap this with a Suspense as needed
// currently this is just used for Routes, which creates a Suspend but never actually needs
// it (because we don't lazy-load routes on the server)
if let Some(inner) = self.now_or_never() {
if let Some(inner) = self.inner.now_or_never() {
inner.to_html_with_buf(buf, position, escape, mark_branches);
}
}
@ -222,7 +296,7 @@ where
) where
Self: Sized,
{
let mut fut = Box::pin(self);
let mut fut = Box::pin(self.inner);
match fut.as_mut().now_or_never() {
Some(inner) => inner.to_html_async_with_buf::<OUT_OF_ORDER>(
buf,
@ -287,10 +361,12 @@ where
cursor: &Cursor<Rndr>,
position: &PositionState,
) -> Self::State {
let Self { subscriber, inner } = self;
// poll the future once immediately
// if it's already available, start in the ready state
// otherwise, start with the fallback
let mut fut = Box::pin(self);
let mut fut = Box::pin(inner);
let initial = fut.as_mut().now_or_never();
let initially_pending = initial.is_none();
let inner = Rc::new(RefCell::new(
@ -310,6 +386,8 @@ where
let value = fut.as_mut().await;
drop(id);
Some(value).rebuild(&mut *state.borrow_mut());
subscriber.forward();
}
});
}
@ -318,7 +396,7 @@ where
}
async fn resolve(self) -> Self::AsyncOutput {
Some(self.await)
Some(self.inner.await)
}
fn dry_resolve(&mut self) {