feat: add Effect::watch (#2732)

This commit is contained in:
Marc-Stefan Cassola 2024-07-29 14:30:18 +01:00 committed by Greg Johnston
parent 11011c2bda
commit c22f20ac28
2 changed files with 397 additions and 0 deletions

View file

@ -240,6 +240,232 @@ impl Effect {
inner: StoredValue::new_with_storage(Some(inner)),
}
}
/// A version of [`Effect::new`] that only listens to any dependency
/// that is accessed inside `dependency_fn`.
///
/// The return value of `dependency_fn` is passed into `handler` as an argument together with the previous value.
/// Additionally, the last return value of `handler` is provided as a third argument, as is done in [`Effect::new`].
///
/// ## Usage
///
/// ```
/// # use reactive_graph::effect::Effect;
/// # use reactive_graph::traits::*;
/// # use reactive_graph::signal::signal;
/// # tokio_test::block_on(async move {
/// # tokio::task::LocalSet::new().run_until(async move {
/// #
/// let (num, set_num) = signal(0);
///
/// let effect = Effect::watch(
/// move || num.get(),
/// move |num, prev_num, _| {
/// // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
/// },
/// false,
/// );
///
/// set_num.set(1); // > "Number: 1; Prev: Some(0)"
///
/// effect.stop(); // stop watching
///
/// set_num.set(2); // (nothing happens)
/// # });
/// # });
/// ```
///
/// The callback itself doesn't track any signal that is accessed within it.
///
/// ```
/// # use reactive_graph::effect::Effect;
/// # use reactive_graph::traits::*;
/// # use reactive_graph::signal::signal;
/// # tokio_test::block_on(async move {
/// # tokio::task::LocalSet::new().run_until(async move {
/// #
/// let (num, set_num) = signal(0);
/// let (cb_num, set_cb_num) = signal(0);
///
/// Effect::watch(
/// move || num.get(),
/// move |num, _, _| {
/// // log::debug!("Number: {}; Cb: {}", num, cb_num.get());
/// },
/// false,
/// );
///
/// set_num.set(1); // > "Number: 1; Cb: 0"
///
/// set_cb_num.set(1); // (nothing happens)
///
/// set_num.set(2); // > "Number: 2; Cb: 1"
/// # });
/// # });
/// ```
///
/// ## Immediate
///
/// If the final parameter `immediate` is true, the `callback` will run immediately.
/// If it's `false`, the `callback` will run only after
/// the first change is detected of any signal that is accessed in `deps`.
///
/// ```
/// # use reactive_graph::effect::Effect;
/// # use reactive_graph::traits::*;
/// # use reactive_graph::signal::signal;
/// # tokio_test::block_on(async move {
/// # tokio::task::LocalSet::new().run_until(async move {
/// #
/// let (num, set_num) = signal(0);
///
/// Effect::watch(
/// move || num.get(),
/// move |num, prev_num, _| {
/// // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
/// },
/// true,
/// ); // > "Number: 0; Prev: None"
///
/// set_num.set(1); // > "Number: 1; Prev: Some(0)"
/// # });
/// # });
/// ```
pub fn watch<D, T>(
mut dependency_fn: impl FnMut() -> D + 'static,
mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T + 'static,
immediate: bool,
) -> Self
where
D: 'static,
T: 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
if cfg!(feature = "effects") {
Executor::spawn_local({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if first_run
|| subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
subscriber.clear_sources(&subscriber);
let old_dep_value = mem::take(
&mut *dep_value.write().or_poisoned(),
);
let new_dep_value = owner.with_cleanup(|| {
subscriber.with_observer(|| dependency_fn())
});
let old_watch_value = mem::take(
&mut *watch_value.write().or_poisoned(),
);
if immediate || !first_run {
let new_watch_value = handler(
&new_dep_value,
old_dep_value.as_ref(),
old_watch_value,
);
*watch_value.write().or_poisoned() =
Some(new_watch_value);
}
*dep_value.write().or_poisoned() =
Some(new_dep_value);
first_run = false;
}
}
}
});
}
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
}
/// This is to [`Effect::watch`] what [`Effect::new_sync`] is to [`Effect::new`].
pub fn watch_sync<D, T>(
mut dependency_fn: impl FnMut() -> D + Send + Sync + 'static,
mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T
+ Send
+ Sync
+ 'static,
immediate: bool,
) -> Self
where
D: Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
if cfg!(feature = "effects") {
Executor::spawn({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if first_run
|| subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
subscriber.clear_sources(&subscriber);
let old_dep_value = mem::take(
&mut *dep_value.write().or_poisoned(),
);
let new_dep_value = owner.with_cleanup(|| {
subscriber.with_observer(|| dependency_fn())
});
let old_watch_value = mem::take(
&mut *watch_value.write().or_poisoned(),
);
if immediate || !first_run {
let new_watch_value = handler(
&new_dep_value,
old_dep_value.as_ref(),
old_watch_value,
);
*watch_value.write().or_poisoned() =
Some(new_watch_value);
}
*dep_value.write().or_poisoned() =
Some(new_dep_value);
first_run = false;
}
}
}
});
}
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
}
}
impl ToAnySubscriber for Effect {

View file

@ -0,0 +1,171 @@
#[cfg(feature = "effects")]
use any_spawner::Executor;
#[cfg(feature = "effects")]
use reactive_graph::{effect::Effect, prelude::*, signal::RwSignal};
#[cfg(feature = "effects")]
use std::sync::{Arc, RwLock};
#[cfg(feature = "effects")]
use tokio::task;
#[cfg(feature = "effects")]
#[tokio::test]
async fn watch_runs() {
_ = Executor::init_tokio();
task::LocalSet::new()
.run_until(async {
let a = RwSignal::new(-1);
// simulate an arbitrary side effect
let b = Arc::new(RwLock::new(String::new()));
let effect = Effect::watch(
move || a.get(),
{
let b = b.clone();
move |a, prev_a, prev_ret| {
let formatted = format!(
"Value is {a}; Prev is {prev_a:?}; Prev return is \
{prev_ret:?}"
);
*b.write().unwrap() = formatted;
a + 10
}
},
false,
);
Executor::tick().await;
assert_eq!(b.read().unwrap().as_str(), "");
a.set(1);
Executor::tick().await;
assert_eq!(
b.read().unwrap().as_str(),
"Value is 1; Prev is Some(-1); Prev return is None"
);
a.set(2);
Executor::tick().await;
assert_eq!(
b.read().unwrap().as_str(),
"Value is 2; Prev is Some(1); Prev return is Some(11)"
);
effect.stop();
*b.write().unwrap() = "nothing happened".to_string();
a.set(3);
Executor::tick().await;
assert_eq!(b.read().unwrap().as_str(), "nothing happened");
})
.await
}
#[cfg(feature = "effects")]
#[tokio::test]
async fn watch_runs_immediately() {
_ = Executor::init_tokio();
task::LocalSet::new()
.run_until(async {
let a = RwSignal::new(-1);
// simulate an arbitrary side effect
let b = Arc::new(RwLock::new(String::new()));
Effect::watch(
move || a.get(),
{
let b = b.clone();
move |a, prev_a, prev_ret| {
let formatted = format!(
"Value is {a}; Prev is {prev_a:?}; Prev return is \
{prev_ret:?}"
);
*b.write().unwrap() = formatted;
a + 10
}
},
true,
);
Executor::tick().await;
assert_eq!(
b.read().unwrap().as_str(),
"Value is -1; Prev is None; Prev return is None"
);
a.set(1);
Executor::tick().await;
assert_eq!(
b.read().unwrap().as_str(),
"Value is 1; Prev is Some(-1); Prev return is Some(9)"
);
})
.await
}
#[cfg(feature = "effects")]
#[tokio::test]
async fn watch_ignores_callback() {
_ = Executor::init_tokio();
task::LocalSet::new()
.run_until(async {
let a = RwSignal::new(-1);
let b = RwSignal::new(0);
// simulate an arbitrary side effect
let s = Arc::new(RwLock::new(String::new()));
Effect::watch(
move || a.get(),
{
let s = s.clone();
move |a, _, _| {
let formatted =
format!("Value a is {a}; Value b is {}", b.get());
*s.write().unwrap() = formatted;
a + 10
}
},
false,
);
Executor::tick().await;
a.set(1);
Executor::tick().await;
assert_eq!(
s.read().unwrap().as_str(),
"Value a is 1; Value b is 0"
);
*s.write().unwrap() = "nothing happened".to_string();
b.set(10);
Executor::tick().await;
assert_eq!(s.read().unwrap().as_str(), "nothing happened");
a.set(2);
Executor::tick().await;
assert_eq!(
s.read().unwrap().as_str(),
"Value a is 2; Value b is 10"
);
})
.await
}