check whether ArcAsyncDerived actually needs to run when marked check

This commit is contained in:
Greg Johnston 2024-05-20 08:05:44 -04:00
parent 941689fc5b
commit ce5f2c81ed

View file

@ -1,5 +1,5 @@
use super::{
inner::ArcAsyncDerivedInner, AsyncDerivedReadyFuture, AsyncState,
inner::ArcAsyncDerivedInner, ArcAsyncDerivedReadyFuture, AsyncState,
ScopedFuture,
};
#[cfg(feature = "sandboxed-arenas")]
@ -102,6 +102,7 @@ macro_rules! spawn_derived {
inner: Arc::clone(&inner),
};
let any_subscriber = this.to_any_subscriber();
let mut first_run = true;
$spawner({
let value = Arc::downgrade(&this.value);
@ -109,48 +110,51 @@ macro_rules! spawn_derived {
let wakers = Arc::downgrade(&this.wakers);
let fut = async move {
while rx.next().await.is_some() {
match (value.upgrade(), inner.upgrade(), wakers.upgrade()) {
(Some(value), Some(inner), Some(wakers)) => {
// generate new Future
let owner = inner.read().or_poisoned().owner.clone();
let fut = owner.with_cleanup(|| {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
});
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
if first_run || any_subscriber.update_if_necessary() {
first_run = false;
match (value.upgrade(), inner.upgrade(), wakers.upgrade()) {
(Some(value), Some(inner), Some(wakers)) => {
// generate new Future
let owner = inner.read().or_poisoned().owner.clone();
let fut = owner.with_cleanup(|| {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
});
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
// update state from Complete to Reloading
{
let mut value = value.write().or_poisoned();
// if it's initial Loading, it will just reset to Loading
if let AsyncState::Complete(old) =
mem::take(&mut *value)
// update state from Complete to Reloading
{
*value = AsyncState::Reloading(old);
let mut value = value.write().or_poisoned();
// if it's initial Loading, it will just reset to Loading
if let AsyncState::Complete(old) =
mem::take(&mut *value)
{
*value = AsyncState::Reloading(old);
}
}
// notify reactive subscribers that we're now loading
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// generate and assign new value
let new_value = fut.await;
*value.write().or_poisoned() = AsyncState::Complete(new_value);
// notify reactive subscribers that we're not loading any more
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// notify async .awaiters
for waker in mem::take(&mut *wakers.write().or_poisoned()) {
waker.wake();
}
}
// notify reactive subscribers that we're now loading
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// generate and assign new value
let new_value = fut.await;
*value.write().or_poisoned() = AsyncState::Complete(new_value);
// notify reactive subscribers that we're not loading any more
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_check();
}
// notify async .awaiters
for waker in mem::take(&mut *wakers.write().or_poisoned()) {
waker.wake();
}
_ => break,
}
_ => break,
}
}
};
@ -211,8 +215,8 @@ impl<T: 'static> ArcAsyncDerived<T> {
this
}
pub fn ready(&self) -> AsyncDerivedReadyFuture<T> {
AsyncDerivedReadyFuture {
pub fn ready(&self) -> ArcAsyncDerivedReadyFuture<T> {
ArcAsyncDerivedReadyFuture {
source: self.to_any_source(),
value: Arc::clone(&self.value),
wakers: Arc::clone(&self.wakers),