make effect system async to make signals send + sync

This commit is contained in:
Evan Almloff 2023-11-06 18:47:51 -06:00
parent c33276e0a0
commit 466005890d
8 changed files with 195 additions and 86 deletions

View file

@ -142,6 +142,24 @@ fn fuzz() {
}
}
/// The type erased id of a generational box.
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct GenerationalBoxId {
data_ptr: *const (),
#[cfg(any(debug_assertions, feature = "check_generation"))]
generation: u32,
}
impl Debug for GenerationalBoxId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[cfg(any(debug_assertions, feature = "check_generation"))]
f.write_fmt(format_args!("{:?}@{:?}", self.data_ptr, self.generation))?;
#[cfg(not(any(debug_assertions, feature = "check_generation")))]
f.write_fmt(format_args!("{:?}", self.data_ptr))?;
Ok(())
}
}
/// The core Copy state type. The generational box will be dropped when the [Owner] is dropped.
pub struct GenerationalBox<T, S = UnsyncStorage> {
raw: MemoryLocation<S>,
@ -180,6 +198,15 @@ impl<T: 'static, S: Storage<T>> GenerationalBox<T, S> {
}
}
/// Get the id of the generational box.
pub fn id(&self) -> GenerationalBoxId {
GenerationalBoxId {
data_ptr: self.raw.data.data_ptr(),
#[cfg(any(debug_assertions, feature = "check_generation"))]
generation: self.generation,
}
}
/// Try to read the value. Returns None if the value is no longer valid.
pub fn try_read(&self) -> Option<S::Ref> {
self.validate().then(|| self.raw.data.try_read()).flatten()

View file

@ -14,6 +14,9 @@ simple_logger = "4.2.0"
serde = { version = "1", features = ["derive"], optional = true }
parking_lot = "0.12.1"
once_cell = "1.18.0"
rustc-hash.workspace = true
futures-channel.workspace = true
futures-util.workspace = true
[dev-dependencies]
dioxus = { workspace = true }
@ -22,4 +25,4 @@ tokio = { version = "1", features = ["full"] }
[features]
default = []
serialize = ["serde"]
serialize = ["serde"]

View file

@ -1,20 +1,29 @@
use core::{self, fmt::Debug};
use dioxus_core::prelude::*;
use generational_box::UnsyncStorage;
use futures_channel::mpsc::UnboundedSender;
use futures_util::StreamExt;
use generational_box::GenerationalBoxId;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fmt::{self, Formatter};
use crate::use_signal;
use crate::{dependency::Dependency, CopyValue};
#[derive(Copy, Clone, PartialEq)]
thread_local! {
pub(crate)static EFFECT_STACK: EffectStack = EffectStack::default();
}
pub(crate) struct EffectStack {
pub(crate) effects: CopyValue<Vec<Effect>, UnsyncStorage>,
pub(crate) effects: RwLock<Vec<Effect>>,
pub(crate) effect_mapping: RwLock<FxHashMap<GenerationalBoxId, Effect>>,
}
impl Default for EffectStack {
fn default() -> Self {
Self {
effects: CopyValue::new_in_scope(Vec::new(), ScopeId::ROOT),
effects: RwLock::new(Vec::new()),
effect_mapping: RwLock::new(FxHashMap::default()),
}
}
}
@ -25,13 +34,38 @@ impl EffectStack {
}
}
pub(crate) fn get_effect_stack() -> EffectStack {
/// This is a thread safe reference to an effect stack running on another thread.
#[derive(Clone)]
pub(crate) struct EffectStackRef {
rerun_effect: UnboundedSender<GenerationalBoxId>,
}
impl EffectStackRef {
pub(crate) fn rerun_effect(&self, id: GenerationalBoxId) {
self.rerun_effect.unbounded_send(id).unwrap();
}
}
pub(crate) fn get_effect_ref() -> EffectStackRef {
match consume_context() {
Some(rt) => rt,
None => {
let store = EffectStack::default();
provide_root_context(store);
store
let (sender, mut receiver) = futures_channel::mpsc::unbounded();
spawn(async move {
while let Some(id) = receiver.next().await {
EFFECT_STACK.with(|stack| {
let effect_mapping = stack.effect_mapping.read();
if let Some(effect) = effect_mapping.get(&id) {
effect.try_run();
}
});
}
});
let stack_ref = EffectStackRef {
rerun_effect: sender,
};
provide_root_context(stack_ref.clone());
stack_ref
}
}
}
@ -68,19 +102,43 @@ pub fn use_effect_with_dependencies<D: Dependency>(
#[derive(Copy, Clone, PartialEq)]
pub struct Effect {
pub(crate) source: ScopeId,
pub(crate) callback: CopyValue<Box<dyn FnMut()>, UnsyncStorage>,
pub(crate) effect_stack: EffectStack,
pub(crate) inner: CopyValue<EffectInner>,
}
pub(crate) struct EffectInner {
pub(crate) callback: Box<dyn FnMut()>,
pub(crate) id: GenerationalBoxId,
}
impl EffectInner {
pub(crate) fn new(callback: Box<dyn FnMut()>) -> CopyValue<Self> {
let copy = CopyValue::invalid();
let inner = EffectInner {
callback: Box::new(callback),
id: copy.id(),
};
copy.set(inner);
copy
}
}
impl Drop for EffectInner {
fn drop(&mut self) {
EFFECT_STACK.with(|stack| {
stack.effect_mapping.write().remove(&self.id);
});
}
}
impl Debug for Effect {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("{:?}", self.callback.value))
f.write_fmt(format_args!("{:?}", self.inner.value))
}
}
impl Effect {
pub(crate) fn current() -> Option<Self> {
get_effect_stack().effects.read().last().copied()
EFFECT_STACK.with(|stack| stack.effects.read().last().copied())
}
/// Create a new effect. The effect will be run immediately and whenever any signal it reads changes.
@ -89,10 +147,16 @@ impl Effect {
pub fn new(callback: impl FnMut() + 'static) -> Self {
let myself = Self {
source: current_scope_id().expect("in a virtual dom"),
callback: CopyValue::new(Box::new(callback)),
effect_stack: get_effect_stack(),
inner: EffectInner::new(Box::new(callback)),
};
EFFECT_STACK.with(|stack| {
stack
.effect_mapping
.write()
.insert(myself.inner.id(), myself);
});
myself.try_run();
myself
@ -100,13 +164,17 @@ impl Effect {
/// Run the effect callback immediately. Returns `true` if the effect was run. Returns `false` is the effect is dead.
pub fn try_run(&self) {
if let Some(mut callback) = self.callback.try_write() {
if let Some(mut inner) = self.inner.try_write() {
{
self.effect_stack.effects.write().push(*self);
EFFECT_STACK.with(|stack| {
stack.effects.write().push(*self);
});
}
callback();
(inner.callback)();
{
self.effect_stack.effects.write().pop();
EFFECT_STACK.with(|stack| {
stack.effects.write().pop();
});
}
}
}

View file

@ -3,7 +3,6 @@ use crate::signal::{ReadOnlySignal, Signal, Write};
use crate::SignalData;
use generational_box::Mappable;
use generational_box::{MappableMut, Storage};
use std::ops::Deref;
use std::{
fmt::{Debug, Display},

View file

@ -2,6 +2,7 @@
#![doc(html_logo_url = "https://avatars.githubusercontent.com/u/79236386")]
#![doc(html_favicon_url = "https://avatars.githubusercontent.com/u/79236386")]
#![warn(missing_docs)]
#![allow(clippy::type_complexity)]
mod rt;
pub use rt::*;

View file

@ -1,3 +1,4 @@
use generational_box::GenerationalBoxId;
use generational_box::UnsyncStorage;
use std::mem::MaybeUninit;
use std::ops::Deref;
@ -6,40 +7,37 @@ use std::rc::Rc;
use dioxus_core::prelude::*;
use dioxus_core::ScopeId;
use generational_box::AnyStorage;
use generational_box::Storage;
use generational_box::{GenerationalBox, Owner};
use crate::Effect;
fn current_owner<S: Storage<T>, T>() -> Rc<Owner<S>> {
todo!()
// match Effect::current() {
// // If we are inside of an effect, we should use the owner of the effect as the owner of the value.
// Some(effect) => {
// let scope_id = effect.source;
// owner_in_scope(scope_id)
// }
// // Otherwise either get an owner from the current scope or create a new one.
// None => match has_context() {
// Some(rt) => rt,
// None => {
// let owner = Rc::new(current_store().owner());
// provide_context(owner).expect("in a virtual dom")
// }
// },
// }
match Effect::current() {
// If we are inside of an effect, we should use the owner of the effect as the owner of the value.
Some(effect) => {
let scope_id = effect.source;
owner_in_scope(scope_id)
}
// Otherwise either get an owner from the current scope or create a new one.
None => match has_context() {
Some(rt) => rt,
None => {
let owner = Rc::new(S::owner());
provide_context(owner).expect("in a virtual dom")
}
},
}
}
fn owner_in_scope<S: Storage<T>, T>(scope: ScopeId) -> Rc<Owner<S>> {
todo!()
// match consume_context_from_scope(scope) {
// Some(rt) => rt,
// None => {
// let owner = Rc::new(current_store().owner());
// provide_context_to_scope(scope, owner).expect("in a virtual dom")
// }
// }
match consume_context_from_scope(scope) {
Some(rt) => rt,
None => {
let owner = Rc::new(S::owner());
provide_context_to_scope(scope, owner).expect("in a virtual dom")
}
}
}
/// CopyValue is a wrapper around a value to make the value mutable and Copy.
@ -144,7 +142,7 @@ impl<T: 'static, S: Storage<T>> CopyValue<T, S> {
}
/// Set the value. If the value has been dropped, this will panic.
pub fn set(&mut self, value: T) {
pub fn set(&self, value: T) {
*self.write() = value;
}
@ -159,6 +157,11 @@ impl<T: 'static, S: Storage<T>> CopyValue<T, S> {
let mut write = self.write();
f(&mut *write)
}
/// Get the generational id of the value.
pub fn id(&self) -> GenerationalBoxId {
self.value.id()
}
}
impl<T: Clone + 'static, S: Storage<T>> CopyValue<T, S> {

View file

@ -2,8 +2,8 @@ use dioxus_core::prelude::*;
use generational_box::Storage;
use crate::dependency::Dependency;
use crate::use_signal;
use crate::{get_effect_stack, signal::SignalData, CopyValue, Effect, ReadOnlySignal, Signal};
use crate::{get_effect_ref, signal::SignalData, CopyValue, Effect, ReadOnlySignal, Signal};
use crate::{use_signal, EffectInner, EFFECT_STACK};
/// Creates a new unsync Selector. The selector will be run immediately and whenever any signal it reads changes.
///
@ -30,7 +30,6 @@ pub fn use_selector<R: PartialEq>(
use_maybe_sync_selector(cx, f)
}
/// Creates a new Selector that may be sync. The selector will be run immediately and whenever any signal it reads changes.
///
/// Selectors can be used to efficiently compute derived data from signals.
@ -56,7 +55,6 @@ pub fn use_maybe_sync_selector<R: PartialEq, S: Storage<SignalData<R>>>(
*cx.use_hook(|| maybe_sync_selector(f))
}
/// Creates a new unsync Selector with some local dependencies. The selector will be run immediately and whenever any signal it reads or any dependencies it tracks changes
///
/// Selectors can be used to efficiently compute derived data from signals.
@ -74,14 +72,14 @@ pub fn use_maybe_sync_selector<R: PartialEq, S: Storage<SignalData<R>>>(
/// }
/// ```
#[must_use = "Consider using `use_effect` to rerun a callback when dependencies change"]
pub fn use_selector_with_dependencies<R: PartialEq, D: Dependency, >(
pub fn use_selector_with_dependencies<R: PartialEq, D: Dependency>(
cx: &ScopeState,
dependencies: D,
mut f: impl FnMut(D::Out) -> R + 'static,
f: impl FnMut(D::Out) -> R + 'static,
) -> ReadOnlySignal<R>
where
D::Out: 'static,
{
{
use_maybe_sync_selector_with_dependencies(cx, dependencies, f)
}
@ -102,7 +100,11 @@ where
/// }
/// ```
#[must_use = "Consider using `use_effect` to rerun a callback when dependencies change"]
pub fn use_maybe_sync_selector_with_dependencies<R: PartialEq, D: Dependency, S: Storage<SignalData<R>>>(
pub fn use_maybe_sync_selector_with_dependencies<
R: PartialEq,
D: Dependency,
S: Storage<SignalData<R>>,
>(
cx: &ScopeState,
dependencies: D,
mut f: impl FnMut(D::Out) -> R + 'static,
@ -127,9 +129,7 @@ where
/// Creates a new unsync Selector. The selector will be run immediately and whenever any signal it reads changes.
///
/// Selectors can be used to efficiently compute derived data from signals.
pub fn selector<R: PartialEq>(
mut f: impl FnMut() -> R + 'static,
) -> ReadOnlySignal<R> {
pub fn selector<R: PartialEq>(f: impl FnMut() -> R + 'static) -> ReadOnlySignal<R> {
maybe_sync_selector(f)
}
@ -144,33 +144,36 @@ pub fn maybe_sync_selector<R: PartialEq, S: Storage<SignalData<R>>>(
};
let effect = Effect {
source: current_scope_id().expect("in a virtual dom"),
callback: CopyValue::invalid(),
effect_stack: get_effect_stack(),
inner: CopyValue::invalid(),
};
{
get_effect_stack().effects.write().push(effect);
EFFECT_STACK.with(|stack| stack.effects.write().push(effect));
}
state.inner.value.set(SignalData {
subscribers: Default::default(),
update_any: schedule_update_any().expect("in a virtual dom"),
value: f(),
effect_stack: get_effect_stack(),
effect_ref: get_effect_ref(),
});
{
get_effect_stack().effects.write().pop();
EFFECT_STACK.with(|stack| stack.effects.write().pop());
}
effect.callback.value.set(Box::new(move || {
let value = f();
let changed = {
let old = state.inner.read();
value != old.value
};
if changed {
state.set(value)
}
}));
let invalid_id = state.inner.value.id();
effect.inner.value.set(EffectInner {
callback: Box::new(move || {
let value = f();
let changed = {
let old = state.inner.read();
value != old.value
};
if changed {
state.set(value)
}
}),
id: invalid_id,
});
ReadOnlySignal::new_maybe_sync(state)
}

View file

@ -11,10 +11,10 @@ use dioxus_core::{
prelude::{current_scope_id, has_context, provide_context, schedule_update_any},
ScopeId, ScopeState,
};
use generational_box::{AnyStorage, Mappable, MappableMut, Storage, SyncStorage, UnsyncStorage};
use generational_box::{GenerationalBoxId, Mappable, MappableMut, Storage, UnsyncStorage};
use parking_lot::RwLock;
use crate::{get_effect_stack, CopyValue, Effect, EffectStack};
use crate::{get_effect_ref, CopyValue, EffectStackRef, EFFECT_STACK};
/// Creates a new Signal. Signals are a Copy state management solution with automatic dependency tracking.
///
@ -91,8 +91,7 @@ pub fn use_signal_sync<T: Send + Sync + 'static>(
cx: &ScopeState,
f: impl FnOnce() -> T,
) -> Signal<T, UnsyncStorage> {
// *cx.use_hook(|| Signal::new(f()))
todo!()
*cx.use_hook(|| Signal::new(f()))
}
#[derive(Clone)]
@ -127,14 +126,14 @@ fn current_unsubscriber() -> Unsubscriber {
#[derive(Default)]
pub(crate) struct SignalSubscribers {
pub(crate) subscribers: Vec<ScopeId>,
pub(crate) effect_subscribers: Vec<Effect>,
pub(crate) effect_subscribers: Vec<GenerationalBoxId>,
}
/// The data stored for tracking in a signal.
pub struct SignalData<T> {
pub(crate) subscribers: Arc<RwLock<SignalSubscribers>>,
pub(crate) update_any: Arc<dyn Fn(ScopeId) + Sync + Send>,
pub(crate) effect_stack: EffectStack,
pub(crate) effect_ref: EffectStackRef,
pub(crate) value: T,
}
@ -208,7 +207,7 @@ impl<T: 'static, S: Storage<SignalData<T>>> Signal<T, S> {
subscribers: Default::default(),
update_any: schedule_update_any().expect("in a virtual dom"),
value,
effect_stack: get_effect_stack(),
effect_ref: get_effect_ref(),
}),
}
}
@ -221,7 +220,7 @@ impl<T: 'static, S: Storage<SignalData<T>>> Signal<T, S> {
subscribers: Default::default(),
update_any: schedule_update_any().expect("in a virtual dom"),
value,
effect_stack: get_effect_stack(),
effect_ref: get_effect_ref(),
},
owner,
),
@ -239,12 +238,12 @@ impl<T: 'static, S: Storage<SignalData<T>>> Signal<T, S> {
&self,
) -> <<S as Storage<SignalData<T>>>::Ref as Mappable<SignalData<T>>>::Mapped<T> {
let inner = self.inner.read();
if let Some(effect) = inner.effect_stack.current() {
if let Some(effect) = EFFECT_STACK.with(|stack| stack.current()) {
let subscribers = inner.subscribers.read();
if !subscribers.effect_subscribers.contains(&effect) {
if !subscribers.effect_subscribers.contains(&effect.inner.id()) {
drop(subscribers);
let mut subscribers = inner.subscribers.write();
subscribers.effect_subscribers.push(effect);
subscribers.effect_subscribers.push(effect.inner.id());
}
} else if let Some(current_scope_id) = current_scope_id() {
// only subscribe if the vdom is rendering
@ -295,18 +294,19 @@ impl<T: 'static, S: Storage<SignalData<T>>> Signal<T, S> {
}
}
let self_read = &self.inner.read();
let subscribers = {
let self_read = self.inner.read();
let mut effects = &mut self_read.subscribers.write().effect_subscribers;
let effects = &mut self_read.subscribers.write().effect_subscribers;
std::mem::take(&mut *effects)
};
let effect_ref = &self_read.effect_ref;
for effect in subscribers {
tracing::trace!(
"Write on {:?} triggered effect {:?}",
self.inner.value,
effect
);
effect.try_run();
effect_ref.rerun_effect(effect);
}
}
@ -328,6 +328,11 @@ impl<T: 'static, S: Storage<SignalData<T>>> Signal<T, S> {
let mut write = self.write();
f(&mut *write)
}
/// Get the generational id of the signal.
pub fn id(&self) -> generational_box::GenerationalBoxId {
self.inner.id()
}
}
impl<T: Clone + 'static, S: Storage<SignalData<T>>> Signal<T, S> {