fix: correct thread-local behavior for Effects (closes #2754)

This commit is contained in:
Greg Johnston 2024-08-01 19:30:43 -04:00
parent 805be42f7a
commit 5ad17d6b6c
2 changed files with 151 additions and 125 deletions

View file

@ -5,7 +5,7 @@ use crate::{
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
WithObserver,
},
owner::{LocalStorage, Owner, StoredValue},
owner::{LocalStorage, Owner, Storage, StoredValue, SyncStorage},
traits::Dispose,
};
use any_spawner::Executor;
@ -72,13 +72,17 @@ use std::{
/// this with a web framework, this generally means that effects **do not run on the server**.
/// and you can call browser-specific APIs within the effect function without causing issues.
/// If you need an effect to run on the server, use [`Effect::new_isomorphic`].
pub struct Effect {
inner: StoredValue<Option<Arc<RwLock<EffectInner>>>, LocalStorage>,
pub struct Effect<S> {
inner: Option<StoredValue<StoredEffect, S>>,
}
impl Dispose for Effect {
type StoredEffect = Option<Arc<RwLock<EffectInner>>>;
impl<S> Dispose for Effect<S> {
fn dispose(self) {
self.inner.dispose()
if let Some(inner) = self.inner {
inner.dispose()
}
}
}
@ -100,12 +104,22 @@ fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
(rx, owner, inner)
}
impl Effect {
impl<S> Effect<S>
where
S: Storage<StoredEffect>,
{
/// Stops this effect before it is disposed.
pub fn stop(self) {
drop(self.inner.try_update_value(|inner| inner.take()));
if let Some(inner) = self
.inner
.and_then(|this| this.try_update_value(|inner| inner.take()))
{
drop(inner);
}
}
}
impl Effect<LocalStorage> {
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
@ -116,11 +130,11 @@ impl Effect {
where
T: 'static,
{
let (mut rx, owner, inner) = effect_base();
let value = Arc::new(RwLock::new(None::<T>));
let mut first_run = true;
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let value = Arc::new(RwLock::new(None::<T>));
let mut first_run = true;
if cfg!(feature = "effects") {
Executor::spawn_local({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
@ -145,100 +159,11 @@ impl Effect {
}
}
});
}
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
}
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This spawns a task that can be run on any thread. For an effect that will be spawned on
/// the current thread, use [`new`](Effect::new).
pub fn new_sync<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));
if cfg!(feature = "effects") {
Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if first_run
|| subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
}
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
}
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This will run whether the `effects` feature is enabled or not.
pub fn new_isomorphic<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));
Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if first_run
|| subscriber
.with_observer(|| subscriber.update_if_necessary())
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
StoredValue::new_with_storage(Some(inner))
});
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
Self { inner }
}
/// A version of [`Effect::new`] that only listens to any dependency
@ -340,12 +265,12 @@ impl Effect {
D: 'static,
T: 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
if cfg!(feature = "effects") {
Executor::spawn_local({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
@ -390,10 +315,102 @@ impl Effect {
}
}
});
}
StoredValue::new_with_storage(Some(inner))
});
Self { inner }
}
}
impl Effect<SyncStorage> {
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This spawns a task that can be run on any thread. For an effect that will be spawned on
/// the current thread, use [`new`](Effect::new).
pub fn new_sync<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));
Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if first_run
|| subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
StoredValue::new_with_storage(Some(inner))
});
Self { inner }
}
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This will run whether the `effects` feature is enabled or not.
pub fn new_isomorphic<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));
Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if first_run
|| subscriber
.with_observer(|| subscriber.update_if_necessary())
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
Self {
inner: StoredValue::new_with_storage(Some(inner)),
inner: Some(StoredValue::new_with_storage(Some(inner))),
}
}
@ -415,7 +432,7 @@ impl Effect {
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
if cfg!(feature = "effects") {
let inner = cfg!(feature = "effects").then(|| {
Executor::spawn({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
@ -460,22 +477,28 @@ impl Effect {
}
}
});
}
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
StoredValue::new_with_storage(Some(inner))
});
Self { inner }
}
}
impl ToAnySubscriber for Effect {
impl<S> ToAnySubscriber for Effect<S>
where
S: Storage<StoredEffect>,
{
fn to_any_subscriber(&self) -> AnySubscriber {
self.inner
.try_with_value(|inner| {
inner.as_ref().map(|inner| inner.to_any_subscriber())
.and_then(|inner| {
inner
.try_with_value(|inner| {
inner.as_ref().map(|inner| inner.to_any_subscriber())
})
.flatten()
})
.flatten()
.expect("tried to subscribe to effect that has been stopped")
.expect("tried to set effect that has been stopped")
}
}
@ -484,7 +507,9 @@ impl ToAnySubscriber for Effect {
#[track_caller]
#[deprecated = "This function is being removed to conform to Rust \
idioms.Please use `Effect::new()` instead."]
pub fn create_effect<T>(fun: impl FnMut(Option<T>) -> T + 'static) -> Effect
pub fn create_effect<T>(
fun: impl FnMut(Option<T>) -> T + 'static,
) -> Effect<LocalStorage>
where
T: 'static,
{

View file

@ -8,11 +8,12 @@ use crate::{
use or_poisoned::OrPoisoned;
use std::sync::{Arc, RwLock, Weak};
/// Handles internal subscription logic for effects.
#[derive(Debug)]
pub(crate) struct EffectInner {
pub dirty: bool,
pub observer: Sender,
pub sources: SourceSet,
pub struct EffectInner {
pub(crate) dirty: bool,
pub(crate) observer: Sender,
pub(crate) sources: SourceSet,
}
impl ToAnySubscriber for Arc<RwLock<EffectInner>> {