mirror of
https://github.com/leptos-rs/leptos
synced 2025-02-02 23:13:25 +00:00
parent
f0c5ffe55f
commit
9cc8ee3c5a
6 changed files with 150 additions and 55 deletions
reactive_graph
|
@ -15,7 +15,6 @@ use crate::{
|
|||
};
|
||||
pub use arc_memo::*;
|
||||
pub use async_derived::*;
|
||||
pub(crate) use inner::MemoInner;
|
||||
pub use memo::*;
|
||||
pub use selector::*;
|
||||
|
||||
|
|
|
@ -12,11 +12,10 @@ use crate::{
|
|||
traits::{DefinedAt, Get, IsDisposed, ReadUntracked},
|
||||
};
|
||||
use core::fmt::Debug;
|
||||
use or_poisoned::OrPoisoned;
|
||||
use std::{
|
||||
hash::Hash,
|
||||
panic::Location,
|
||||
sync::{Arc, RwLock, Weak},
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
|
||||
/// An efficient derived reactive value based on other reactive values.
|
||||
|
@ -95,7 +94,7 @@ where
|
|||
{
|
||||
#[cfg(any(debug_assertions, leptos_debuginfo))]
|
||||
defined_at: &'static Location<'static>,
|
||||
inner: Arc<RwLock<MemoInner<T, S>>>,
|
||||
inner: Arc<MemoInner<T, S>>,
|
||||
}
|
||||
|
||||
impl<T: 'static> ArcMemo<T, SyncStorage>
|
||||
|
@ -161,7 +160,7 @@ where
|
|||
Weak::clone(weak) as Weak<dyn Subscriber + Send + Sync>,
|
||||
);
|
||||
|
||||
RwLock::new(MemoInner::new(Arc::new(fun), subscriber))
|
||||
MemoInner::new(Arc::new(fun), subscriber)
|
||||
});
|
||||
Self {
|
||||
#[cfg(any(debug_assertions, leptos_debuginfo))]
|
||||
|
@ -312,15 +311,11 @@ where
|
|||
S: Storage<T>,
|
||||
{
|
||||
fn add_source(&self, source: AnySource) {
|
||||
self.inner.write().or_poisoned().sources.insert(source);
|
||||
self.inner.add_source(source);
|
||||
}
|
||||
|
||||
fn clear_sources(&self, subscriber: &AnySubscriber) {
|
||||
self.inner
|
||||
.write()
|
||||
.or_poisoned()
|
||||
.sources
|
||||
.clear_sources(subscriber);
|
||||
self.inner.clear_sources(subscriber);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,15 +323,15 @@ impl<T: 'static, S> ReadUntracked for ArcMemo<T, S>
|
|||
where
|
||||
S: Storage<T>,
|
||||
{
|
||||
type Value = ReadGuard<T, Mapped<Plain<MemoInner<T, S>>, T>>;
|
||||
type Value = ReadGuard<T, Mapped<Plain<Option<S::Wrapped>>, T>>;
|
||||
|
||||
fn try_read_untracked(&self) -> Option<Self::Value> {
|
||||
self.update_if_necessary();
|
||||
|
||||
Mapped::try_new(Arc::clone(&self.inner), |t| {
|
||||
Mapped::try_new(Arc::clone(&self.inner.value), |t| {
|
||||
// safe to unwrap here because update_if_necessary
|
||||
// guarantees the value is Some
|
||||
t.value.as_ref().unwrap().as_borrowed()
|
||||
t.as_ref().unwrap().as_borrowed()
|
||||
})
|
||||
.map(ReadGuard::new)
|
||||
}
|
||||
|
|
|
@ -15,10 +15,15 @@ pub struct MemoInner<T, S>
|
|||
where
|
||||
S: Storage<T>,
|
||||
{
|
||||
pub(crate) value: Option<S::Wrapped>,
|
||||
/// Must always be aquired *after* the reactivity lock
|
||||
pub(crate) value: Arc<RwLock<Option<S::Wrapped>>>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(crate) fun: Arc<dyn Fn(Option<T>) -> (T, bool) + Send + Sync>,
|
||||
pub(crate) owner: Owner,
|
||||
pub(crate) reactivity: RwLock<MemoInnerReactivity>,
|
||||
}
|
||||
|
||||
pub(crate) struct MemoInnerReactivity {
|
||||
pub(crate) state: ReactiveNodeState,
|
||||
pub(crate) sources: SourceSet,
|
||||
pub(crate) subscribers: SubscriberSet,
|
||||
|
@ -44,40 +49,44 @@ where
|
|||
any_subscriber: AnySubscriber,
|
||||
) -> Self {
|
||||
Self {
|
||||
value: None,
|
||||
value: Arc::new(RwLock::new(None)),
|
||||
fun,
|
||||
owner: Owner::new(),
|
||||
state: ReactiveNodeState::Dirty,
|
||||
sources: Default::default(),
|
||||
subscribers: SubscriberSet::new(),
|
||||
any_subscriber,
|
||||
reactivity: RwLock::new(MemoInnerReactivity {
|
||||
state: ReactiveNodeState::Dirty,
|
||||
sources: Default::default(),
|
||||
subscribers: SubscriberSet::new(),
|
||||
any_subscriber,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static, S> ReactiveNode for RwLock<MemoInner<T, S>>
|
||||
impl<T: 'static, S> ReactiveNode for MemoInner<T, S>
|
||||
where
|
||||
S: Storage<T>,
|
||||
{
|
||||
fn mark_dirty(&self) {
|
||||
self.write().or_poisoned().state = ReactiveNodeState::Dirty;
|
||||
self.reactivity.write().or_poisoned().state = ReactiveNodeState::Dirty;
|
||||
self.mark_subscribers_check();
|
||||
}
|
||||
|
||||
fn mark_check(&self) {
|
||||
{
|
||||
let mut lock = self.write().or_poisoned();
|
||||
let mut lock = self.reactivity.write().or_poisoned();
|
||||
if lock.state != ReactiveNodeState::Dirty {
|
||||
lock.state = ReactiveNodeState::Check;
|
||||
}
|
||||
}
|
||||
for sub in (&self.read().or_poisoned().subscribers).into_iter() {
|
||||
for sub in
|
||||
(&self.reactivity.read().or_poisoned().subscribers).into_iter()
|
||||
{
|
||||
sub.mark_check();
|
||||
}
|
||||
}
|
||||
|
||||
fn mark_subscribers_check(&self) {
|
||||
let lock = self.read().or_poisoned();
|
||||
let lock = self.reactivity.read().or_poisoned();
|
||||
for sub in (&lock.subscribers).into_iter() {
|
||||
sub.mark_check();
|
||||
}
|
||||
|
@ -85,7 +94,7 @@ where
|
|||
|
||||
fn update_if_necessary(&self) -> bool {
|
||||
let (state, sources) = {
|
||||
let inner = self.read().or_poisoned();
|
||||
let inner = self.reactivity.read().or_poisoned();
|
||||
(inner.state, inner.sources.clone())
|
||||
};
|
||||
|
||||
|
@ -94,32 +103,37 @@ where
|
|||
ReactiveNodeState::Dirty => true,
|
||||
ReactiveNodeState::Check => (&sources).into_iter().any(|source| {
|
||||
source.update_if_necessary()
|
||||
|| self.read().or_poisoned().state
|
||||
|| self.reactivity.read().or_poisoned().state
|
||||
== ReactiveNodeState::Dirty
|
||||
}),
|
||||
};
|
||||
|
||||
if needs_update {
|
||||
let (fun, value, owner) = {
|
||||
let mut lock = self.write().or_poisoned();
|
||||
(lock.fun.clone(), lock.value.take(), lock.owner.clone())
|
||||
};
|
||||
let fun = self.fun.clone();
|
||||
let owner = self.owner.clone();
|
||||
// No deadlock risk, because we only hold the value lock.
|
||||
let value = self.value.write().or_poisoned().take();
|
||||
|
||||
let any_subscriber =
|
||||
{ self.read().or_poisoned().any_subscriber.clone() };
|
||||
{ self.reactivity.read().or_poisoned().any_subscriber.clone() };
|
||||
any_subscriber.clear_sources(&any_subscriber);
|
||||
let (new_value, changed) = owner.with_cleanup(|| {
|
||||
any_subscriber
|
||||
.with_observer(|| fun(value.map(StorageAccess::into_taken)))
|
||||
});
|
||||
|
||||
let mut lock = self.write().or_poisoned();
|
||||
lock.value = Some(S::wrap(new_value));
|
||||
lock.state = ReactiveNodeState::Clean;
|
||||
// Two locks are aquired, so order matters.
|
||||
let mut reactivity_lock = self.reactivity.write().or_poisoned();
|
||||
{
|
||||
// Safety: Can block endlessly if the user is has a ReadGuard on the value
|
||||
let mut value_lock = self.value.write().or_poisoned();
|
||||
*value_lock = Some(S::wrap(new_value));
|
||||
}
|
||||
reactivity_lock.state = ReactiveNodeState::Clean;
|
||||
|
||||
if changed {
|
||||
let subs = lock.subscribers.clone();
|
||||
drop(lock);
|
||||
let subs = reactivity_lock.subscribers.clone();
|
||||
drop(reactivity_lock);
|
||||
for sub in subs {
|
||||
// don't trigger reruns of effects/memos
|
||||
// basically: if one of the observers has triggered this memo to
|
||||
|
@ -128,49 +142,54 @@ where
|
|||
sub.mark_dirty();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
drop(reactivity_lock);
|
||||
}
|
||||
|
||||
changed
|
||||
} else {
|
||||
if let Ok(mut lock) = self.try_write() {
|
||||
lock.state = ReactiveNodeState::Clean;
|
||||
}
|
||||
let mut lock = self.reactivity.write().or_poisoned();
|
||||
lock.state = ReactiveNodeState::Clean;
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static, S> Source for RwLock<MemoInner<T, S>>
|
||||
impl<T: 'static, S> Source for MemoInner<T, S>
|
||||
where
|
||||
S: Storage<T>,
|
||||
{
|
||||
fn add_subscriber(&self, subscriber: AnySubscriber) {
|
||||
if let Ok(mut lock) = self.try_write() {
|
||||
lock.subscribers.subscribe(subscriber);
|
||||
}
|
||||
let mut lock = self.reactivity.write().or_poisoned();
|
||||
lock.subscribers.subscribe(subscriber);
|
||||
}
|
||||
|
||||
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
|
||||
self.write()
|
||||
self.reactivity
|
||||
.write()
|
||||
.or_poisoned()
|
||||
.subscribers
|
||||
.unsubscribe(subscriber);
|
||||
}
|
||||
|
||||
fn clear_subscribers(&self) {
|
||||
self.write().or_poisoned().subscribers.take();
|
||||
self.reactivity.write().or_poisoned().subscribers.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static, S> Subscriber for RwLock<MemoInner<T, S>>
|
||||
impl<T: 'static, S> Subscriber for MemoInner<T, S>
|
||||
where
|
||||
S: Storage<T>,
|
||||
{
|
||||
fn add_source(&self, source: AnySource) {
|
||||
self.write().or_poisoned().sources.insert(source);
|
||||
self.reactivity.write().or_poisoned().sources.insert(source);
|
||||
}
|
||||
|
||||
fn clear_sources(&self, subscriber: &AnySubscriber) {
|
||||
self.write().or_poisoned().sources.clear_sources(subscriber);
|
||||
self.reactivity
|
||||
.write()
|
||||
.or_poisoned()
|
||||
.sources
|
||||
.clear_sources(subscriber);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use super::{inner::MemoInner, ArcMemo};
|
||||
use super::ArcMemo;
|
||||
use crate::{
|
||||
owner::{ArenaItem, FromLocal, LocalStorage, Storage, SyncStorage},
|
||||
signal::{
|
||||
|
@ -306,7 +306,8 @@ where
|
|||
T: 'static,
|
||||
S: Storage<ArcMemo<T, S>> + Storage<T>,
|
||||
{
|
||||
type Value = ReadGuard<T, Mapped<Plain<MemoInner<T, S>>, T>>;
|
||||
type Value =
|
||||
ReadGuard<T, Mapped<Plain<Option<<S as Storage<T>>::Wrapped>>, T>>;
|
||||
|
||||
fn try_read_untracked(&self) -> Option<Self::Value> {
|
||||
self.inner
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
/// Types that abstract over signals with values that can be read.
|
||||
pub mod read {
|
||||
use crate::{
|
||||
computed::{ArcMemo, Memo, MemoInner},
|
||||
computed::{ArcMemo, Memo},
|
||||
graph::untrack,
|
||||
owner::{
|
||||
ArcStoredValue, ArenaItem, FromLocal, LocalStorage, Storage,
|
||||
|
@ -1735,22 +1735,40 @@ pub mod read {
|
|||
}
|
||||
|
||||
/// The content of a [`Signal`] wrapper read guard, variable depending on the signal type.
|
||||
#[derive(Debug)]
|
||||
pub enum SignalReadGuard<T: 'static, S: Storage<T>> {
|
||||
/// A read signal guard.
|
||||
Read(ReadGuard<T, Plain<T>>),
|
||||
#[allow(clippy::type_complexity)]
|
||||
/// A memo guard.
|
||||
Memo(ReadGuard<T, Mapped<Plain<MemoInner<T, S>>, T>>),
|
||||
Memo(
|
||||
ReadGuard<T, Mapped<Plain<Option<<S as Storage<T>>::Wrapped>>, T>>,
|
||||
),
|
||||
/// A fake guard for derived signals, the content had to actually be cloned, so it's not a guard but we pretend it is.
|
||||
Owned(T),
|
||||
}
|
||||
|
||||
impl<T: 'static + std::fmt::Debug, S: Storage<T> + std::fmt::Debug>
|
||||
std::fmt::Debug for SignalReadGuard<T, S>
|
||||
where
|
||||
<S as Storage<T>>::Wrapped: std::fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Read(arg0) => f.debug_tuple("Read").field(arg0).finish(),
|
||||
Self::Memo(arg0) => f.debug_tuple("Memo").field(arg0).finish(),
|
||||
Self::Owned(arg0) => {
|
||||
f.debug_tuple("Owned").field(arg0).finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, S> Clone for SignalReadGuard<T, S>
|
||||
where
|
||||
S: Storage<T>,
|
||||
T: Clone,
|
||||
Plain<T>: Clone,
|
||||
Mapped<Plain<MemoInner<T, S>>, T>: Clone,
|
||||
Mapped<Plain<Option<<S as Storage<T>>::Wrapped>>, T>: Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
|
|
|
@ -444,6 +444,69 @@ fn unsync_derived_signal_and_memo() {
|
|||
assert_eq!(f.get_untracked(), 6);
|
||||
}
|
||||
|
||||
#[cfg(feature = "effects")]
|
||||
#[tokio::test]
|
||||
async fn test_memo_multiple_read_guards() {
|
||||
// regression test for https://github.com/leptos-rs/leptos/issues/3158
|
||||
let owner = Owner::new();
|
||||
owner.set();
|
||||
use imports::*;
|
||||
|
||||
_ = Executor::init_tokio();
|
||||
let owner = Owner::new();
|
||||
owner.set();
|
||||
task::LocalSet::new()
|
||||
.run_until(async {
|
||||
let memo = Memo::<i32>::new_with_compare(|_| 42, |_, _| true);
|
||||
|
||||
Effect::new(move |_| {
|
||||
let guard_a = memo.read();
|
||||
let guard_b = memo.read();
|
||||
assert_eq!(guard_a, 42);
|
||||
assert_eq!(guard_b, 42);
|
||||
});
|
||||
Executor::tick().await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(feature = "effects")]
|
||||
#[tokio::test]
|
||||
async fn test_memo_read_guard_held() {
|
||||
// regression test for https://github.com/leptos-rs/leptos/issues/3252
|
||||
let owner = Owner::new();
|
||||
owner.set();
|
||||
use imports::*;
|
||||
|
||||
_ = Executor::init_tokio();
|
||||
let owner = Owner::new();
|
||||
owner.set();
|
||||
task::LocalSet::new()
|
||||
.run_until(async {
|
||||
let source = RwSignal::new(0);
|
||||
|
||||
let directly_derived =
|
||||
Memo::new_with_compare(move |_| source.get(), |_, _| true);
|
||||
let indirect = Memo::new_with_compare(
|
||||
move |_| directly_derived.get(),
|
||||
|_, _| true,
|
||||
);
|
||||
|
||||
Effect::new(move |_| {
|
||||
let direct_value = directly_derived.read();
|
||||
let indirect_value = indirect.get();
|
||||
assert_eq!(direct_value, indirect_value);
|
||||
});
|
||||
|
||||
Executor::tick().await;
|
||||
source.set(1);
|
||||
Executor::tick().await;
|
||||
source.set(2);
|
||||
Executor::tick().await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn memo_updates_even_if_not_read_until_later() {
|
||||
#![allow(clippy::bool_assert_comparison)]
|
||||
|
|
Loading…
Reference in a new issue