start working on porting over docs and tests and 0.7...

This commit is contained in:
Greg Johnston 2024-05-11 20:38:14 -04:00
parent b881167b8f
commit e514f7144d
7 changed files with 299 additions and 193 deletions

View file

@ -28,6 +28,7 @@ web-sys = "0.3"
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "time", "macros"] }
tokio-test = { version = "0.4" }
any_spawner = { workspace = true, features = ["futures-executor", "tokio"] }
[features]

View file

@ -1,14 +1,91 @@
use crate::{
computed::ArcMemo,
diagnostics::is_suppressing_resource_load,
owner::{Owner, StoredValue},
signal::{ArcRwSignal, RwSignal},
traits::{DefinedAt, Dispose, GetUntracked, Update},
traits::{DefinedAt, Dispose, Get, GetUntracked, Update},
unwrap_signal,
};
use any_spawner::Executor;
use futures::{channel::oneshot, select, FutureExt};
use std::{future::Future, panic::Location, pin::Pin, sync::Arc};
/// An action run some asynchronous code when you dispatch a new value to it, and gives you
/// reactive access to the result.
///
/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
/// creating an action and immediately dispatching a value to it, this is probably the wrong
/// primitive.
///
/// The arena-allocated, `Copy` version of an `ArcAction` is an [`Action`].
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let save_data = ArcAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// // the argument currently running
/// let input = save_data.input();
/// // the most recent returned result
/// let result_of_call = save_data.value();
/// // whether the call is pending
/// let pending = save_data.pending();
/// // how many times the action has run
/// // useful for reactively updating something else in response to a `dispatch` and response
/// let version = save_data.version();
///
/// // before we do anything
/// assert_eq!(input.get(), None); // no argument yet
/// assert_eq!(pending.get(), false); // isn't pending a response
/// assert_eq!(result_of_call.get(), None); // there's no "last value"
/// assert_eq!(version.get(), 0);
///
/// // dispatch the action
/// save_data.dispatch("My todo".to_string());
///
/// // when we're making the call
/// assert_eq!(input.get(), Some("My todo".to_string()));
/// assert_eq!(pending.get(), true); // is pending
/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
///
/// # tokio::time::sleep(std::time::Duration::from_millis(25)).await;
///
/// // after call has resolved
/// assert_eq!(input.get(), None); // input clears out after resolved
/// assert_eq!(pending.get(), false); // no longer pending
/// assert_eq!(result_of_call.get(), Some(42));
/// assert_eq!(version.get(), 1);
/// # });
/// ```
///
/// The input to the `async` function should always be a single value,
/// but it can be of any type. The argument is always passed by reference to the
/// function, because it is stored in [Action::input] as well.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// // if there's a single argument, just use that
/// let action1 = ArcAction::new(|input: &String| {
/// let input = input.clone();
/// async move { todo!() }
/// });
///
/// // if there are no arguments, use the unit type `()`
/// let action2 = ArcAction::new(|input: &()| async { todo!() });
///
/// // if there are multiple arguments, use a tuple
/// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() });
pub struct ArcAction<I, O>
where
I: 'static,
@ -49,6 +126,46 @@ where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
/// Creates a new action. Thi is lazy: it does not run the action function until some value
/// is dispatched.
///
/// The constructor takes a function which will create a new `Future` from some input data.
/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
/// be spawned.
///
/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
/// `Future` must be `Send` so that it can be moved across threads by the async executor as
/// needed.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = ArcAction::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// act.dispatch(3);
/// assert_eq!(act.input().get(), Some(3));
///
/// // Remember that async functions already return a future if they are
/// // not `await`ed. You can save keystrokes by leaving out the `async move`
///
/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
/// act2.dispatch(String::from("i'm in a doctest"));
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
///
/// // after it resolves
/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
///
/// async fn yell(n: String) -> String {
/// n.to_uppercase()
/// }
/// # });
/// ```
#[track_caller]
pub fn new<F, Fu>(action_fn: F) -> Self
where
@ -68,52 +185,52 @@ where
#[track_caller]
pub fn dispatch(&self, input: I) {
if !is_suppressing_resource_load() {
let mut fut = (self.action_fn)(&input).fuse();
//if !is_suppressing_resource_load() {
let mut fut = (self.action_fn)(&input).fuse();
// abort this task if the owner is cleaned up
let (abort_tx, mut abort_rx) = oneshot::channel();
Owner::on_cleanup(move || {
abort_tx.send(()).expect(
"tried to cancel a future in ArcAction::dispatch(), but \
the channel has already closed",
);
});
// abort this task if the owner is cleaned up
let (abort_tx, mut abort_rx) = oneshot::channel();
Owner::on_cleanup(move || {
abort_tx.send(()).expect(
"tried to cancel a future in ArcAction::dispatch(), but the \
channel has already closed",
);
});
// Update the state before loading
self.in_flight.update(|n| *n += 1);
let current_version =
self.version.try_get_untracked().unwrap_or_default();
self.input.try_update(|inp| *inp = Some(input));
// Update the state before loading
self.in_flight.update(|n| *n += 1);
let current_version =
self.version.try_get_untracked().unwrap_or_default();
self.input.try_update(|inp| *inp = Some(input));
// Spawn the task
Executor::spawn({
let input = self.input.clone();
let version = self.version.clone();
let value = self.value.clone();
let in_flight = self.in_flight.clone();
async move {
select! {
// if the abort message has been sent, bail and do nothing
_ = abort_rx => {
in_flight.update(|n| *n = n.saturating_sub(1));
},
// otherwise, update the value
result = fut => {
in_flight.update(|n| *n = n.saturating_sub(1));
let is_latest = version.get_untracked() <= current_version;
if is_latest {
version.update(|n| *n += 1);
value.update(|n| *n = Some(result));
}
if in_flight.get_untracked() == 0 {
input.update(|inp| *inp = None);
}
// Spawn the task
Executor::spawn({
let input = self.input.clone();
let version = self.version.clone();
let value = self.value.clone();
let in_flight = self.in_flight.clone();
async move {
select! {
// if the abort message has been sent, bail and do nothing
_ = abort_rx => {
in_flight.update(|n| *n = n.saturating_sub(1));
},
// otherwise, update the value
result = fut => {
in_flight.update(|n| *n = n.saturating_sub(1));
let is_latest = version.get_untracked() <= current_version;
if is_latest {
version.update(|n| *n += 1);
value.update(|n| *n = Some(result));
}
if in_flight.get_untracked() == 0 {
input.update(|inp| *inp = None);
}
}
}
});
}
}
});
//}
}
}
@ -132,6 +249,12 @@ impl<I, O> ArcAction<I, O> {
pub fn value(&self) -> ArcRwSignal<Option<O>> {
self.value.clone()
}
#[track_caller]
pub fn pending(&self) -> ArcMemo<bool> {
let in_flight = self.in_flight.clone();
ArcMemo::new(move |_| in_flight.get() > 0)
}
}
impl<I, O> DefinedAt for ArcAction<I, O>

View file

@ -1,3 +1,5 @@
//! Reactive primitives to asynchronously update some value.
mod action;
mod multi_action;
pub use action::*;

View file

@ -1,24 +1,14 @@
// The point of these diagnostics is to give useful error messages when someone
// tries to access a reactive variable outside the reactive scope. They track when
// you create a signal/memo, and where you access it non-reactively.
//! By default, attempting to [`Track`](crate::traits::Track) a signal when you are not in a
//! reactive tracking context will cause a warning when you are in debug mode.
//!
//! In some cases, this warning is a false positive. For example, inside an event listener in a
//! user interface, you never want to read from a signal reactively; the event listener should run
//! when the event fires, not when a signal read in the event listener changes.
//!
//! This module provides utilities to suppress those warnings by entering a
//! [`SpecialNonReactiveZone`].
#[cfg(debug_assertions)]
#[allow(dead_code)] // allowed for SSR
#[derive(Copy, Clone)]
pub(crate) struct AccessDiagnostics {
pub defined_at: &'static std::panic::Location<'static>,
pub called_at: &'static std::panic::Location<'static>,
}
#[cfg(not(debug_assertions))]
#[derive(Copy, Clone, Default)]
pub(crate) struct AccessDiagnostics;
/// This just tracks whether we're currently in a context in which it really doesn't
/// matter whether something is reactive: for example, in an event listener or timeout.
/// Entering this zone basically turns off the warnings, and exiting it turns them back on.
/// All of this is a no-op in release mode.
#[doc(hidden)]
/// Marks an execution block that is known not to be reactive, and suppresses warnings.
#[derive(Debug)]
pub struct SpecialNonReactiveZone;
@ -33,6 +23,13 @@ thread_local! {
}
impl SpecialNonReactiveZone {
/// Suppresses warnings about non-reactive accesses until the guard is dropped.
pub fn enter() -> SpecialNonReactiveZoneGuard {
IS_SPECIAL_ZONE.set(true);
SpecialNonReactiveZoneGuard
}
#[cfg(debug_assertions)]
#[inline(always)]
pub(crate) fn is_inside() -> bool {
if cfg!(debug_assertions) {
@ -41,11 +38,6 @@ impl SpecialNonReactiveZone {
false
}
}
pub fn enter() -> SpecialNonReactiveZoneGuard {
IS_SPECIAL_ZONE.set(true);
SpecialNonReactiveZoneGuard
}
}
impl Drop for SpecialNonReactiveZoneGuard {
@ -54,24 +46,6 @@ impl Drop for SpecialNonReactiveZoneGuard {
}
}
#[doc(hidden)]
#[macro_export]
macro_rules! diagnostics {
($this:ident) => {{
#[cfg(debug_assertions)]
{
AccessDiagnostics {
defined_at: $this.defined_at,
called_at: std::panic::Location::caller(),
}
}
#[cfg(not(debug_assertions))]
{
AccessDiagnostics
}
}};
}
thread_local! {
static SUPPRESS_RESOURCE_LOAD: Cell<bool> = const { Cell::new(false) };
}

View file

@ -100,28 +100,30 @@ impl<T: Source + ToAnySource + DefinedAt> Track for T {
{
use crate::diagnostics::SpecialNonReactiveZone;
//if !SpecialNonReactiveZone::is_inside() {
let called_at = Location::caller();
let ty = std::any::type_name::<T>();
let defined_at = self
.defined_at()
.map(ToString::to_string)
.unwrap_or_else(|| String::from("{unknown}"));
crate::log_warning(format_args!(
"At {called_at}, you access a {ty} (defined at \
{defined_at}) outside a reactive tracking context. This \
might mean your app is not responding to changes in \
signal values in the way you expect.\n\nHeres how to \
fix it:\n\n1. If this is inside a `view!` macro, make \
sure you are passing a function, not a value.\n NO \
<p>{{x.get() * 2}}</p>\n YES <p>{{move || x.get() * \
2}}</p>\n\n2. If its in the body of a component, try \
wrapping this access in a closure: \n NO let y = \
x.get() * 2\n YES let y = move || x.get() * 2.\n\n3. \
If youre *trying* to access the value without tracking, \
use `.get_untracked()` or `.with_untracked()` instead."
));
//}
if !SpecialNonReactiveZone::is_inside() {
let called_at = Location::caller();
let ty = std::any::type_name::<T>();
let defined_at = self
.defined_at()
.map(ToString::to_string)
.unwrap_or_else(|| String::from("{unknown}"));
crate::log_warning(format_args!(
"At {called_at}, you access a {ty} (defined at \
{defined_at}) outside a reactive tracking context. \
This might mean your app is not responding to \
changes in signal values in the way you \
expect.\n\nHeres how to fix it:\n\n1. If this is \
inside a `view!` macro, make sure you are passing a \
function, not a value.\n NO <p>{{x.get() * \
2}}</p>\n YES <p>{{move || x.get() * \
2}}</p>\n\n2. If its in the body of a component, \
try wrapping this access in a closure: \n NO \
let y = x.get() * 2\n YES let y = move || \
x.get() * 2.\n\n3. If youre *trying* to access the \
value without tracking, use `.get_untracked()` or \
`.with_untracked()` instead."
));
}
}
}
}

View file

@ -18,7 +18,6 @@ async fn arc_async_derived_calculates_eagerly() {
});
assert_eq!(value.clone().await, 42);
std::mem::forget(value);
}
#[tokio::test]
@ -41,7 +40,6 @@ async fn arc_async_derived_tracks_signal_change() {
signal.set(50);
sleep(Duration::from_millis(5)).await;
assert_eq!(value.clone().await, 50);
std::mem::forget(value);
}
#[tokio::test]

View file

@ -8,6 +8,7 @@ use std::{
mem,
sync::{Arc, RwLock},
};
use tokio::task;
pub async fn tick() {
tokio::time::sleep(std::time::Duration::from_micros(1)).await;
@ -16,122 +17,127 @@ pub async fn tick() {
#[tokio::test]
async fn render_effect_runs() {
_ = Executor::init_tokio();
task::LocalSet::new()
.run_until(async {
let a = RwSignal::new(-1);
Executor::spawn(async {
let a = RwSignal::new(-1);
// simulate an arbitrary side effect
let b = Arc::new(RwLock::new(String::new()));
// simulate an arbitrary side effect
let b = Arc::new(RwLock::new(String::new()));
// we forget it so it continues running
// if it's dropped, it will stop listening
mem::forget(RenderEffect::new({
let b = b.clone();
move |_| {
let formatted = format!("Value is {}", a.get());
*b.write().unwrap() = formatted;
}
}));
// we forget it so it continues running
// if it's dropped, it will stop listening
mem::forget(RenderEffect::new({
let b = b.clone();
move |_| {
let formatted = format!("Value is {}", a.get());
*b.write().unwrap() = formatted;
}
}));
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is -1");
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is -1");
println!("setting to 1");
a.set(1);
println!("setting to 1");
a.set(1);
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is 1");
});
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is 1");
})
.await;
}
#[tokio::test]
async fn effect_runs() {
_ = Executor::init_tokio();
Executor::spawn(async {
let a = RwSignal::new(-1);
task::LocalSet::new()
.run_until(async {
let a = RwSignal::new(-1);
// simulate an arbitrary side effect
let b = Arc::new(RwLock::new(String::new()));
// simulate an arbitrary side effect
let b = Arc::new(RwLock::new(String::new()));
Effect::new({
let b = b.clone();
move |_| {
let formatted = format!("Value is {}", a.get());
*b.write().unwrap() = formatted;
}
});
Effect::new({
let b = b.clone();
move |_| {
let formatted = format!("Value is {}", a.get());
*b.write().unwrap() = formatted;
}
});
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is -1");
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is -1");
println!("setting to 1");
a.set(1);
println!("setting to 1");
a.set(1);
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is 1");
});
tick().await;
assert_eq!(b.read().unwrap().as_str(), "Value is 1");
})
.await
}
#[tokio::test]
async fn dynamic_dependencies() {
_ = Executor::init_tokio();
Executor::spawn(async {
let first = RwSignal::new("Greg");
let last = RwSignal::new("Johnston");
let use_last = RwSignal::new(true);
task::LocalSet::new()
.run_until(async {
let first = RwSignal::new("Greg");
let last = RwSignal::new("Johnston");
let use_last = RwSignal::new(true);
let combined_count = Arc::new(RwLock::new(0));
let combined_count = Arc::new(RwLock::new(0));
mem::forget(RenderEffect::new({
let combined_count = Arc::clone(&combined_count);
move |_| {
*combined_count.write().unwrap() += 1;
if use_last.get() {
println!("{} {}", first.get(), last.get());
} else {
println!("{}", first.get());
mem::forget(RenderEffect::new({
let combined_count = Arc::clone(&combined_count);
move |_| {
*combined_count.write().unwrap() += 1;
if use_last.get() {
println!("{} {}", first.get(), last.get());
} else {
println!("{}", first.get());
}
}
}
}));
}));
tick().await;
assert_eq!(*combined_count.read().unwrap(), 1);
tick().await;
assert_eq!(*combined_count.read().unwrap(), 1);
println!("\nsetting `first` to Bob");
first.set("Bob");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 2);
println!("\nsetting `first` to Bob");
first.set("Bob");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 2);
println!("\nsetting `last` to Bob");
last.set("Thompson");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 3);
println!("\nsetting `last` to Bob");
last.set("Thompson");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 3);
println!("\nsetting `use_last` to false");
use_last.set(false);
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `use_last` to false");
use_last.set(false);
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `last` to Jones");
last.set("Jones");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `last` to Jones");
last.set("Jones");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `last` to Jones");
last.set("Smith");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `last` to Jones");
last.set("Smith");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `last` to Stevens");
last.set("Stevens");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `last` to Stevens");
last.set("Stevens");
tick().await;
assert_eq!(*combined_count.read().unwrap(), 4);
println!("\nsetting `use_last` to true");
use_last.set(true);
tick().await;
assert_eq!(*combined_count.read().unwrap(), 5);
});
println!("\nsetting `use_last` to true");
use_last.set(true);
tick().await;
assert_eq!(*combined_count.read().unwrap(), 5);
})
.await
}