mirror of
https://github.com/leptos-rs/leptos
synced 2024-11-10 06:44:17 +00:00
Merge pull request #2959 from leptos-rs/2956
fix: do not retrigger parent effect when Suspend's resources resolve (closes #2956)
This commit is contained in:
commit
2bf04072ea
5 changed files with 132 additions and 47 deletions
3
.github/workflows/get-leptos-changed.yml
vendored
3
.github/workflows/get-leptos-changed.yml
vendored
|
@ -43,11 +43,14 @@ jobs:
|
|||
oco/**
|
||||
or_poisoned/**
|
||||
reactive_graph/**
|
||||
reactive_stores/**
|
||||
reactive_stores_macro/**
|
||||
router/**
|
||||
router_macro/**
|
||||
server_fn/**
|
||||
server_fn/server_fn_macro_default/**
|
||||
server_fn_macro/**
|
||||
tachys/**
|
||||
|
||||
- name: List source files that changed
|
||||
run: echo '${{ steps.changed-source.outputs.all_changed_files }}'
|
||||
|
|
|
@ -719,7 +719,7 @@ where
|
|||
}
|
||||
|
||||
fn to_html(self, style: &mut String) {
|
||||
if let Some(inner) = self.now_or_never() {
|
||||
if let Some(inner) = self.inner.now_or_never() {
|
||||
inner.to_html(style);
|
||||
} else {
|
||||
panic!("You cannot use Suspend on an attribute outside Suspense");
|
||||
|
@ -736,7 +736,8 @@ where
|
|||
let state = Rc::clone(&state);
|
||||
async move {
|
||||
*state.borrow_mut() =
|
||||
Some(self.await.hydrate::<FROM_SERVER>(&el));
|
||||
Some(self.inner.await.hydrate::<FROM_SERVER>(&el));
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
state
|
||||
|
@ -748,7 +749,8 @@ where
|
|||
Executor::spawn_local({
|
||||
let state = Rc::clone(&state);
|
||||
async move {
|
||||
*state.borrow_mut() = Some(self.await.build(&el));
|
||||
*state.borrow_mut() = Some(self.inner.await.build(&el));
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
state
|
||||
|
@ -758,11 +760,12 @@ where
|
|||
Executor::spawn_local({
|
||||
let state = Rc::clone(state);
|
||||
async move {
|
||||
let value = self.await;
|
||||
let value = self.inner.await;
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
value.rebuild(state);
|
||||
}
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -778,6 +781,6 @@ where
|
|||
fn dry_resolve(&mut self) {}
|
||||
|
||||
async fn resolve(self) -> Self::AsyncOutput {
|
||||
self.await
|
||||
self.inner.await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -401,7 +401,8 @@ where
|
|||
let state = Rc::clone(&state);
|
||||
async move {
|
||||
*state.borrow_mut() =
|
||||
Some(self.await.hydrate::<FROM_SERVER>(&key, &el));
|
||||
Some(self.inner.await.hydrate::<FROM_SERVER>(&key, &el));
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
state
|
||||
|
@ -414,7 +415,8 @@ where
|
|||
Executor::spawn_local({
|
||||
let state = Rc::clone(&state);
|
||||
async move {
|
||||
*state.borrow_mut() = Some(self.await.build(&el, &key));
|
||||
*state.borrow_mut() = Some(self.inner.await.build(&el, &key));
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
state
|
||||
|
@ -425,11 +427,12 @@ where
|
|||
Executor::spawn_local({
|
||||
let state = Rc::clone(state);
|
||||
async move {
|
||||
let value = self.await;
|
||||
let value = self.inner.await;
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
value.rebuild(&key, state);
|
||||
}
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -447,7 +450,7 @@ where
|
|||
fn dry_resolve(&mut self) {}
|
||||
|
||||
async fn resolve(self) -> Self::AsyncOutput {
|
||||
self.await
|
||||
self.inner.await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -447,7 +447,7 @@ where
|
|||
type CloneableOwned = Self;
|
||||
|
||||
fn to_html(self, style: &mut String) {
|
||||
if let Some(inner) = self.now_or_never() {
|
||||
if let Some(inner) = self.inner.now_or_never() {
|
||||
inner.to_html(style);
|
||||
} else {
|
||||
panic!("You cannot use Suspend on an attribute outside Suspense");
|
||||
|
@ -464,7 +464,8 @@ where
|
|||
let state = Rc::clone(&state);
|
||||
async move {
|
||||
*state.borrow_mut() =
|
||||
Some(self.await.hydrate::<FROM_SERVER>(&el));
|
||||
Some(self.inner.await.hydrate::<FROM_SERVER>(&el));
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
state
|
||||
|
@ -476,7 +477,8 @@ where
|
|||
Executor::spawn_local({
|
||||
let state = Rc::clone(&state);
|
||||
async move {
|
||||
*state.borrow_mut() = Some(self.await.build(&el));
|
||||
*state.borrow_mut() = Some(self.inner.await.build(&el));
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
state
|
||||
|
@ -486,11 +488,12 @@ where
|
|||
Executor::spawn_local({
|
||||
let state = Rc::clone(state);
|
||||
async move {
|
||||
let value = self.await;
|
||||
let value = self.inner.await;
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
value.rebuild(state);
|
||||
}
|
||||
self.subscriber.forward();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -506,6 +509,6 @@ where
|
|||
fn dry_resolve(&mut self) {}
|
||||
|
||||
async fn resolve(self) -> Self::AsyncOutput {
|
||||
self.await
|
||||
self.inner.await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,56 +10,115 @@ use crate::{
|
|||
};
|
||||
use any_spawner::Executor;
|
||||
use futures::{select, FutureExt};
|
||||
use or_poisoned::OrPoisoned;
|
||||
use reactive_graph::{
|
||||
computed::{
|
||||
suspense::{LocalResourceNotifier, SuspenseContext},
|
||||
ScopedFuture,
|
||||
},
|
||||
graph::{
|
||||
AnySource, AnySubscriber, Observer, ReactiveNode, Source, Subscriber,
|
||||
ToAnySubscriber, WithObserver,
|
||||
},
|
||||
owner::{provide_context, use_context},
|
||||
};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
fmt::Debug,
|
||||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
task::{Context, Poll},
|
||||
sync::{Arc, Mutex, Weak},
|
||||
};
|
||||
|
||||
/// A suspended `Future`, which can be used in the view.
|
||||
#[derive(Clone)]
|
||||
pub struct Suspend<Fut> {
|
||||
inner: Pin<Box<ScopedFuture<Fut>>>,
|
||||
pub(crate) subscriber: SuspendSubscriber,
|
||||
pub(crate) inner: Pin<Box<ScopedFuture<Fut>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SuspendSubscriber {
|
||||
inner: Arc<SuspendSubscriberInner>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SuspendSubscriberInner {
|
||||
outer_subscriber: Option<AnySubscriber>,
|
||||
sources: Mutex<Vec<AnySource>>,
|
||||
}
|
||||
|
||||
impl SuspendSubscriber {
|
||||
pub fn new() -> Self {
|
||||
let outer_subscriber = Observer::get();
|
||||
Self {
|
||||
inner: Arc::new(SuspendSubscriberInner {
|
||||
outer_subscriber,
|
||||
sources: Default::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Re-links all reactive sources from this to another subscriber.
|
||||
///
|
||||
/// This is used to collect reactive dependencies during the rendering phase, and only later
|
||||
/// connect them to any outer effect, to prevent the completion of async resources from
|
||||
/// triggering the render effect to run a second time.
|
||||
pub fn forward(&self) {
|
||||
if let Some(to) = &self.inner.outer_subscriber {
|
||||
let sources =
|
||||
mem::take(&mut *self.inner.sources.lock().or_poisoned());
|
||||
for source in sources {
|
||||
source.add_subscriber(to.clone());
|
||||
to.add_source(source);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReactiveNode for SuspendSubscriberInner {
|
||||
fn mark_dirty(&self) {}
|
||||
|
||||
fn mark_check(&self) {}
|
||||
|
||||
fn mark_subscribers_check(&self) {}
|
||||
|
||||
fn update_if_necessary(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl Subscriber for SuspendSubscriberInner {
|
||||
fn add_source(&self, source: AnySource) {
|
||||
self.sources.lock().or_poisoned().push(source);
|
||||
}
|
||||
|
||||
fn clear_sources(&self, subscriber: &AnySubscriber) {
|
||||
for source in mem::take(&mut *self.sources.lock().or_poisoned()) {
|
||||
source.remove_subscriber(subscriber);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ToAnySubscriber for SuspendSubscriber {
|
||||
fn to_any_subscriber(&self) -> AnySubscriber {
|
||||
AnySubscriber(
|
||||
Arc::as_ptr(&self.inner) as usize,
|
||||
Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut> Suspend<Fut> {
|
||||
/// Creates a new suspended view.
|
||||
pub fn new(fut: Fut) -> Self {
|
||||
Self {
|
||||
inner: Box::pin(ScopedFuture::new(fut)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut> Future for Suspend<Fut>
|
||||
where
|
||||
Fut: Future,
|
||||
{
|
||||
type Output = Fut::Output;
|
||||
|
||||
fn poll(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Self::Output> {
|
||||
self.inner.as_mut().poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut> From<ScopedFuture<Fut>> for Suspend<Fut> {
|
||||
fn from(inner: ScopedFuture<Fut>) -> Self {
|
||||
Self {
|
||||
inner: Box::pin(inner),
|
||||
}
|
||||
let subscriber = SuspendSubscriber::new();
|
||||
let any_subscriber = subscriber.to_any_subscriber();
|
||||
let inner =
|
||||
any_subscriber.with_observer(|| Box::pin(ScopedFuture::new(fut)));
|
||||
Self { subscriber, inner }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,10 +165,12 @@ where
|
|||
|
||||
// TODO cancelation if it fires multiple times
|
||||
fn build(self) -> Self::State {
|
||||
let Self { subscriber, inner } = self;
|
||||
|
||||
// poll the future once immediately
|
||||
// if it's already available, start in the ready state
|
||||
// otherwise, start with the fallback
|
||||
let mut fut = Box::pin(self);
|
||||
let mut fut = Box::pin(inner);
|
||||
let initial = fut.as_mut().now_or_never();
|
||||
let initially_pending = initial.is_none();
|
||||
let inner = Rc::new(RefCell::new(initial.build()));
|
||||
|
@ -127,6 +188,8 @@ where
|
|||
let value = fut.as_mut().await;
|
||||
drop(id);
|
||||
Some(value).rebuild(&mut *state.borrow_mut());
|
||||
|
||||
subscriber.forward();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -135,8 +198,10 @@ where
|
|||
}
|
||||
|
||||
fn rebuild(self, state: &mut Self::State) {
|
||||
let Self { subscriber, inner } = self;
|
||||
|
||||
// get a unique ID if there's a SuspenseContext
|
||||
let fut = self;
|
||||
let fut = inner;
|
||||
let id = use_context::<SuspenseContext>().map(|sc| sc.task_id());
|
||||
|
||||
// spawn the future, and rebuild the state when it resolves
|
||||
|
@ -150,6 +215,8 @@ where
|
|||
// has no parent
|
||||
any_spawner::Executor::tick().await;
|
||||
Some(value).rebuild(&mut *state.borrow_mut());
|
||||
|
||||
subscriber.forward();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -208,7 +275,7 @@ where
|
|||
// TODO wrap this with a Suspense as needed
|
||||
// currently this is just used for Routes, which creates a Suspend but never actually needs
|
||||
// it (because we don't lazy-load routes on the server)
|
||||
if let Some(inner) = self.now_or_never() {
|
||||
if let Some(inner) = self.inner.now_or_never() {
|
||||
inner.to_html_with_buf(buf, position, escape, mark_branches);
|
||||
}
|
||||
}
|
||||
|
@ -222,7 +289,7 @@ where
|
|||
) where
|
||||
Self: Sized,
|
||||
{
|
||||
let mut fut = Box::pin(self);
|
||||
let mut fut = Box::pin(self.inner);
|
||||
match fut.as_mut().now_or_never() {
|
||||
Some(inner) => inner.to_html_async_with_buf::<OUT_OF_ORDER>(
|
||||
buf,
|
||||
|
@ -287,10 +354,12 @@ where
|
|||
cursor: &Cursor<Rndr>,
|
||||
position: &PositionState,
|
||||
) -> Self::State {
|
||||
let Self { subscriber, inner } = self;
|
||||
|
||||
// poll the future once immediately
|
||||
// if it's already available, start in the ready state
|
||||
// otherwise, start with the fallback
|
||||
let mut fut = Box::pin(self);
|
||||
let mut fut = Box::pin(inner);
|
||||
let initial = fut.as_mut().now_or_never();
|
||||
let initially_pending = initial.is_none();
|
||||
let inner = Rc::new(RefCell::new(
|
||||
|
@ -310,15 +379,19 @@ where
|
|||
let value = fut.as_mut().await;
|
||||
drop(id);
|
||||
Some(value).rebuild(&mut *state.borrow_mut());
|
||||
|
||||
subscriber.forward();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
subscriber.forward();
|
||||
}
|
||||
|
||||
SuspendState { inner }
|
||||
}
|
||||
|
||||
async fn resolve(self) -> Self::AsyncOutput {
|
||||
Some(self.await)
|
||||
Some(self.inner.await)
|
||||
}
|
||||
|
||||
fn dry_resolve(&mut self) {
|
||||
|
|
Loading…
Reference in a new issue