diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index 516ed159f..16359c9e4 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -1,5 +1,5 @@ use super::{ - inner::ArcAsyncDerivedInner, AsyncDerivedReadyFuture, AsyncState, + inner::ArcAsyncDerivedInner, ArcAsyncDerivedReadyFuture, AsyncState, ScopedFuture, }; #[cfg(feature = "sandboxed-arenas")] @@ -102,6 +102,7 @@ macro_rules! spawn_derived { inner: Arc::clone(&inner), }; let any_subscriber = this.to_any_subscriber(); + let mut first_run = true; $spawner({ let value = Arc::downgrade(&this.value); @@ -109,48 +110,51 @@ macro_rules! spawn_derived { let wakers = Arc::downgrade(&this.wakers); let fut = async move { while rx.next().await.is_some() { - match (value.upgrade(), inner.upgrade(), wakers.upgrade()) { - (Some(value), Some(inner), Some(wakers)) => { - // generate new Future - let owner = inner.read().or_poisoned().owner.clone(); - let fut = owner.with_cleanup(|| { - any_subscriber - .with_observer(|| ScopedFuture::new($fun())) - }); - #[cfg(feature = "sandboxed-arenas")] - let fut = Sandboxed::new(fut); + if first_run || any_subscriber.update_if_necessary() { + first_run = false; + match (value.upgrade(), inner.upgrade(), wakers.upgrade()) { + (Some(value), Some(inner), Some(wakers)) => { + // generate new Future + let owner = inner.read().or_poisoned().owner.clone(); + let fut = owner.with_cleanup(|| { + any_subscriber + .with_observer(|| ScopedFuture::new($fun())) + }); + #[cfg(feature = "sandboxed-arenas")] + let fut = Sandboxed::new(fut); - // update state from Complete to Reloading - { - let mut value = value.write().or_poisoned(); - // if it's initial Loading, it will just reset to Loading - if let AsyncState::Complete(old) = - mem::take(&mut *value) + // update state from Complete to Reloading { - *value = AsyncState::Reloading(old); + let mut value = value.write().or_poisoned(); + // if it's initial Loading, it will just reset to Loading + if let AsyncState::Complete(old) = + mem::take(&mut *value) + { + *value = AsyncState::Reloading(old); + } + } + + // notify reactive subscribers that we're now loading + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_check(); + } + + // generate and assign new value + let new_value = fut.await; + *value.write().or_poisoned() = AsyncState::Complete(new_value); + + // notify reactive subscribers that we're not loading any more + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_check(); + } + + // notify async .awaiters + for waker in mem::take(&mut *wakers.write().or_poisoned()) { + waker.wake(); } } - - // notify reactive subscribers that we're now loading - for sub in (&inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_check(); - } - - // generate and assign new value - let new_value = fut.await; - *value.write().or_poisoned() = AsyncState::Complete(new_value); - - // notify reactive subscribers that we're not loading any more - for sub in (&inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_check(); - } - - // notify async .awaiters - for waker in mem::take(&mut *wakers.write().or_poisoned()) { - waker.wake(); - } + _ => break, } - _ => break, } } }; @@ -211,8 +215,8 @@ impl ArcAsyncDerived { this } - pub fn ready(&self) -> AsyncDerivedReadyFuture { - AsyncDerivedReadyFuture { + pub fn ready(&self) -> ArcAsyncDerivedReadyFuture { + ArcAsyncDerivedReadyFuture { source: self.to_any_source(), value: Arc::clone(&self.value), wakers: Arc::clone(&self.wakers),