From b0d8d4ee267719a085ebcc7578500ac2355a46eb Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Wed, 18 Sep 2024 21:35:37 -0400 Subject: [PATCH] fix: properly trigger Suspense when Suspend is called again (#2993) --- .../async_derived/arc_async_derived.rs | 19 ++++++++++++++++--- .../computed/async_derived/future_impls.rs | 16 ++++++++++++++-- .../src/computed/async_derived/inner.rs | 2 ++ 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index cfc2d9202..ae27ddd9f 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -233,7 +233,8 @@ macro_rules! spawn_derived { sources: SourceSet::new(), subscribers: SubscriberSet::new(), state: AsyncDerivedState::Clean, - version: 0 + version: 0, + suspenses: Vec::new() })); let value = Arc::new(AsyncRwLock::new($initial)); let wakers = Arc::new(RwLock::new(Vec::new())); @@ -345,14 +346,21 @@ macro_rules! spawn_derived { // generate and assign new value loading.store(true, Ordering::Relaxed); - let this_version = { + let (this_version, suspense_ids) = { let mut guard = inner.write().or_poisoned(); guard.version += 1; - guard.version + let version = guard.version; + let suspense_ids = mem::take(&mut guard.suspenses) + .into_iter() + .map(|sc| sc.task_id()) + .collect::>(); + (version, suspense_ids) }; let new_value = fut.await; + drop(suspense_ids); + let latest_version = inner.read().or_poisoned().version; if latest_version == this_version { @@ -575,6 +583,11 @@ impl ReadUntracked for ArcAsyncDerived { ready.await; drop(handle); }); + self.inner + .write() + .or_poisoned() + .suspenses + .push(suspense_context); } } AsyncPlain::try_new(&self.value).map(ReadGuard::new) diff --git a/reactive_graph/src/computed/async_derived/future_impls.rs b/reactive_graph/src/computed/async_derived/future_impls.rs index e71d5f384..db8096f0e 100644 --- a/reactive_graph/src/computed/async_derived/future_impls.rs +++ b/reactive_graph/src/computed/async_derived/future_impls.rs @@ -1,8 +1,9 @@ -use super::{ArcAsyncDerived, AsyncDerived}; +use super::{inner::ArcAsyncDerivedInner, ArcAsyncDerived, AsyncDerived}; use crate::{ + computed::suspense::SuspenseContext, diagnostics::SpecialNonReactiveZone, graph::{AnySource, ToAnySource}, - owner::Storage, + owner::{use_context, Storage}, signal::guards::{AsyncPlain, Mapped, ReadGuard}, traits::{DefinedAt, Track}, unwrap_signal, @@ -63,6 +64,7 @@ where value: Arc::clone(&self.value), loading: Arc::clone(&self.loading), wakers: Arc::clone(&self.wakers), + inner: Arc::clone(&self.inner), } } } @@ -92,6 +94,7 @@ pub struct AsyncDerivedFuture { value: Arc>>, loading: Arc, wakers: Arc>>, + inner: Arc>, } impl Future for AsyncDerivedFuture @@ -107,6 +110,15 @@ where let waker = cx.waker(); self.source.track(); let value = self.value.read_arc(); + + if let Some(suspense_context) = use_context::() { + self.inner + .write() + .or_poisoned() + .suspenses + .push(suspense_context); + } + pin_mut!(value); match (self.loading.load(Ordering::Relaxed), value.poll(cx)) { (true, _) => { diff --git a/reactive_graph/src/computed/async_derived/inner.rs b/reactive_graph/src/computed/async_derived/inner.rs index bd42cb448..c22640621 100644 --- a/reactive_graph/src/computed/async_derived/inner.rs +++ b/reactive_graph/src/computed/async_derived/inner.rs @@ -1,5 +1,6 @@ use crate::{ channel::Sender, + computed::suspense::SuspenseContext, graph::{ AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber, SubscriberSet, @@ -20,6 +21,7 @@ pub(crate) struct ArcAsyncDerivedInner { pub notifier: Sender, pub state: AsyncDerivedState, pub version: usize, + pub suspenses: Vec, } #[derive(Debug, PartialEq, Eq)]