fix: prevent LocalResource from spawning a local task on the server, when it should only run on the client (closes #2717)

This commit is contained in:
Greg Johnston 2024-07-25 21:12:02 -04:00
parent 1534dd5261
commit 1c05389707
3 changed files with 100 additions and 64 deletions

View file

@ -202,8 +202,13 @@ impl<T> LocalResource<T> {
}
}
};
Self {
data: AsyncDerived::new_unsync(fetcher),
data: if cfg!(feature = "ssr") {
AsyncDerived::new_mock_unsync(fetcher)
} else {
AsyncDerived::new_unsync(fetcher)
},
#[cfg(debug_assertions)]
defined_at: Location::caller(),
}

View file

@ -195,7 +195,7 @@ impl<T> DefinedAt for ArcAsyncDerived<T> {
// whether `fun` returns a `Future` that is `Send`. Doing it as a function would,
// as far as I can tell, require repeating most of the function body.
macro_rules! spawn_derived {
($spawner:expr, $initial:ident, $fun:ident) => {{
($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal) => {{
let (notifier, mut rx) = channel();
let is_ready = $initial.is_some();
@ -262,76 +262,78 @@ macro_rules! spawn_derived {
any_subscriber.mark_dirty();
}
$spawner({
let value = Arc::downgrade(&this.value);
let inner = Arc::downgrade(&this.inner);
let wakers = Arc::downgrade(&this.wakers);
let loading = Arc::downgrade(&this.loading);
let fut = async move {
while rx.next().await.is_some() {
if first_run.is_some() || any_subscriber.update_if_necessary() {
match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
(Some(value), Some(inner), Some(wakers), Some(loading)) => {
// generate new Future
let owner = inner.read().or_poisoned().owner.clone();
let fut = initial_fut.take().unwrap_or_else(|| {
let fut = owner.with_cleanup(|| {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
if $should_spawn {
$spawner({
let value = Arc::downgrade(&this.value);
let inner = Arc::downgrade(&this.inner);
let wakers = Arc::downgrade(&this.wakers);
let loading = Arc::downgrade(&this.loading);
let fut = async move {
while rx.next().await.is_some() {
if first_run.is_some() || any_subscriber.update_if_necessary() {
match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
(Some(value), Some(inner), Some(wakers), Some(loading)) => {
// generate new Future
let owner = inner.read().or_poisoned().owner.clone();
let fut = initial_fut.take().unwrap_or_else(|| {
let fut = owner.with_cleanup(|| {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
});
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
Box::pin(fut)
});
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
Box::pin(fut)
});
// register with global transition listener, if any
let ready_tx = first_run.take().unwrap_or_else(|| {
let (ready_tx, ready_rx) = oneshot::channel();
AsyncTransition::register(ready_rx);
ready_tx
});
// register with global transition listener, if any
let ready_tx = first_run.take().unwrap_or_else(|| {
let (ready_tx, ready_rx) = oneshot::channel();
AsyncTransition::register(ready_rx);
ready_tx
});
// notify reactive subscribers that we're now loading
loading.store(true, Ordering::Relaxed);
inner.write().or_poisoned().dirty = true;
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// generate and assign new value
let new_value = fut.await;
loading.store(false, Ordering::Relaxed);
*value.write().await = Some(new_value);
inner.write().or_poisoned().dirty = true;
// if it's an Err, that just means the Receiver was dropped
// we don't particularly care about that: the point is to notify if
// it still exists, but we don't need to know if Suspense is no
// longer listening
_ = ready_tx.send(());
// notify reactive subscribers that we're not loading any more
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// notify async .awaiters
for waker in mem::take(&mut *wakers.write().or_poisoned()) {
waker.wake();
// notify reactive subscribers that we're now loading
loading.store(true, Ordering::Relaxed);
inner.write().or_poisoned().dirty = true;
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// generate and assign new value
let new_value = fut.await;
loading.store(false, Ordering::Relaxed);
*value.write().await = Some(new_value);
inner.write().or_poisoned().dirty = true;
// if it's an Err, that just means the Receiver was dropped
// we don't particularly care about that: the point is to notify if
// it still exists, but we don't need to know if Suspense is no
// longer listening
_ = ready_tx.send(());
// notify reactive subscribers that we're not loading any more
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// notify async .awaiters
for waker in mem::take(&mut *wakers.write().or_poisoned()) {
waker.wake();
}
}
_ => break,
}
_ => break,
}
}
}
};
};
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
});
fut
});
}
(this, is_ready)
}};
@ -363,7 +365,8 @@ impl<T: 'static> ArcAsyncDerived<T> {
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
let (this, _) = spawn_derived!(Executor::spawn, initial_value, fun);
let (this, _) =
spawn_derived!(Executor::spawn, initial_value, fun, true);
this
}
@ -395,7 +398,20 @@ impl<T: 'static> ArcAsyncDerived<T> {
Fut: Future<Output = T> + 'static,
{
let (this, _) =
spawn_derived!(Executor::spawn_local, initial_value, fun);
spawn_derived!(Executor::spawn_local, initial_value, fun, true);
this
}
#[doc(hidden)]
#[track_caller]
pub fn new_mock_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
let initial = None::<T>;
let (this, _) =
spawn_derived!(Executor::spawn_local, initial, fun, false);
this
}

View file

@ -205,6 +205,21 @@ where
),
}
}
#[doc(hidden)]
pub fn new_mock_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
Self {
#[cfg(debug_assertions)]
defined_at: Location::caller(),
inner: StoredValue::new_with_storage(
ArcAsyncDerived::new_mock_unsync(fun),
),
}
}
}
impl<T, S> AsyncDerived<T, S>