From c9603ea984f34276babb3902c3971e628d310f7c Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Tue, 5 Mar 2024 18:24:01 -0600 Subject: [PATCH 1/8] only poll suspended futures --- packages/core/src/global_context.rs | 6 +- packages/core/src/runtime.rs | 10 ++- packages/core/src/scope_arena.rs | 8 +- packages/core/src/scope_context.rs | 17 +++-- packages/core/src/tasks.rs | 4 + packages/core/src/virtual_dom.rs | 110 +++++++++++++++++++++------- packages/core/tests/suspense.rs | 31 +++++++- 7 files changed, 140 insertions(+), 46 deletions(-) diff --git a/packages/core/src/global_context.rs b/packages/core/src/global_context.rs index 738bf64c3..d84f138be 100644 --- a/packages/core/src/global_context.rs +++ b/packages/core/src/global_context.rs @@ -50,9 +50,9 @@ pub fn provide_root_context(value: T) -> T { .expect("to be in a dioxus runtime") } -/// Suspends the current component -pub fn suspend() -> Option { - Runtime::with_current_scope(|cx| cx.suspend()); +/// Suspended the current component on a specific task and then return None +pub fn suspend(task: Task) -> Element { + Runtime::with_current_scope(|cx| cx.suspend(task)); None } diff --git a/packages/core/src/runtime.rs b/packages/core/src/runtime.rs index 355e4e9ac..dbfe125b2 100644 --- a/packages/core/src/runtime.rs +++ b/packages/core/src/runtime.rs @@ -1,3 +1,5 @@ +use rustc_hash::FxHashSet; + use crate::{ innerlude::{LocalTask, SchedulerMsg}, render_signal::RenderSignal, @@ -24,11 +26,14 @@ pub struct Runtime { // We use this to track the current task pub(crate) current_task: Cell>, - pub(crate) rendering: Cell, - /// Tasks created with cx.spawn pub(crate) tasks: RefCell>>, + // Currently suspended tasks + pub(crate) suspended_tasks: RefCell>, + + pub(crate) rendering: Cell, + pub(crate) sender: futures_channel::mpsc::UnboundedSender, // Synchronous tasks need to be run after the next render. The virtual dom stores a list of those tasks to send a signal to them when the next render is done. @@ -45,6 +50,7 @@ impl Runtime { scope_stack: Default::default(), current_task: Default::default(), tasks: Default::default(), + suspended_tasks: Default::default(), }) } diff --git a/packages/core/src/scope_arena.rs b/packages/core/src/scope_arena.rs index 5f5424143..46de9c184 100644 --- a/packages/core/src/scope_arena.rs +++ b/packages/core/src/scope_arena.rs @@ -41,7 +41,6 @@ impl VirtualDom { let new_nodes = { let context = scope.state(); - context.suspended.set(false); context.hook_index.set(0); // Run all pre-render hooks @@ -70,12 +69,11 @@ impl VirtualDom { self.dirty_scopes .remove(&ScopeOrder::new(context.height, scope_id)); - if context.suspended.get() { + if let Some(task) = context.last_suspendable_task.take() { if matches!(new_nodes, RenderReturn::Aborted(_)) { - self.suspended_scopes.insert(context.id); + tracing::trace!("Suspending {:?} on {:?}", scope_id, task); + self.runtime.suspended_tasks.borrow_mut().insert(task); } - } else if !self.suspended_scopes.is_empty() { - _ = self.suspended_scopes.remove(&context.id); } self.runtime.scope_stack.borrow_mut().pop(); diff --git a/packages/core/src/scope_context.rs b/packages/core/src/scope_context.rs index 9fa54e71c..0acde5055 100644 --- a/packages/core/src/scope_context.rs +++ b/packages/core/src/scope_context.rs @@ -16,13 +16,14 @@ pub(crate) struct Scope { pub(crate) parent_id: Option, pub(crate) height: u32, pub(crate) render_count: Cell, - pub(crate) suspended: Cell, // Note: the order of the hook and context fields is important. The hooks field must be dropped before the contexts field in case a hook drop implementation tries to access a context. pub(crate) hooks: RefCell>>, pub(crate) hook_index: Cell, pub(crate) shared_contexts: RefCell>>, pub(crate) spawned_tasks: RefCell>, + /// The task that was last spawned that may suspend. We use this task to check what task to suspend in the event of an early None return from a component + pub(crate) last_suspendable_task: Cell>, pub(crate) before_render: RefCell>>, pub(crate) after_render: RefCell>>, } @@ -40,9 +41,9 @@ impl Scope { parent_id, height, render_count: Cell::new(0), - suspended: Cell::new(false), shared_contexts: RefCell::new(vec![]), spawned_tasks: RefCell::new(FxHashSet::default()), + last_suspendable_task: Cell::new(None), hooks: RefCell::new(vec![]), hook_index: Cell::new(0), before_render: RefCell::new(vec![]), @@ -241,9 +242,9 @@ impl Scope { Runtime::with(|rt| rt.spawn(self.id, fut)).expect("Runtime to exist") } - /// Mark this component as suspended and then return None - pub fn suspend(&self) -> Option { - self.suspended.set(true); + /// Mark this component as suspended on a specific task and then return None + pub fn suspend(&self, task: Task) -> Option { + self.last_suspendable_task.set(Some(task)); None } @@ -340,10 +341,10 @@ impl ScopeId { .expect("to be in a dioxus runtime") } - /// Suspends the current component - pub fn suspend(self) -> Option { + /// Suspended a component on a specific task and then return None + pub fn suspend(self, task: Task) -> Option { Runtime::with_scope(self, |cx| { - cx.suspend(); + cx.suspend(task); }); None } diff --git a/packages/core/src/tasks.rs b/packages/core/src/tasks.rs index 39f353a4c..2335afac3 100644 --- a/packages/core/src/tasks.rs +++ b/packages/core/src/tasks.rs @@ -173,6 +173,9 @@ impl Runtime { // Remove it from the scheduler self.tasks.borrow_mut().try_remove(id.0); + + // Remove it from the suspended tasks + self.suspended_tasks.borrow_mut().remove(&id); } // Remove the scope from the stack @@ -187,6 +190,7 @@ impl Runtime { /// /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you pub(crate) fn remove_task(&self, id: Task) -> Option> { + self.suspended_tasks.borrow_mut().remove(&id); self.tasks.borrow_mut().try_remove(id.0) } } diff --git a/packages/core/src/virtual_dom.rs b/packages/core/src/virtual_dom.rs index db7cc349c..dec5b261a 100644 --- a/packages/core/src/virtual_dom.rs +++ b/packages/core/src/virtual_dom.rs @@ -18,7 +18,7 @@ use crate::{ AttributeValue, ComponentFunction, Element, Event, Mutations, }; use futures_util::StreamExt; -use rustc_hash::{FxHashMap, FxHashSet}; +use rustc_hash::FxHashMap; use slab::Slab; use std::{any::Any, rc::Rc}; use tracing::instrument; @@ -201,9 +201,6 @@ pub struct VirtualDom { pub(crate) runtime: Rc, - // Currently suspended scopes - pub(crate) suspended_scopes: FxHashSet, - rx: futures_channel::mpsc::UnboundedReceiver, } @@ -319,7 +316,6 @@ impl VirtualDom { queued_templates: Default::default(), elements: Default::default(), mounts: Default::default(), - suspended_scopes: Default::default(), }; let root = dom.new_scope(Box::new(root), "app"); @@ -448,13 +444,6 @@ impl VirtualDom { /// ``` #[instrument(skip(self), level = "trace", name = "VirtualDom::wait_for_work")] pub async fn wait_for_work(&mut self) { - // And then poll the futures - self.poll_tasks().await; - } - - /// Poll the scheduler for any work - #[instrument(skip(self), level = "trace", name = "VirtualDom::poll_tasks")] - async fn poll_tasks(&mut self) { loop { // Process all events - Scopes are marked dirty, etc // Sometimes when wakers fire we get a slew of updates at once, so its important that we drain this completely @@ -469,17 +458,22 @@ impl VirtualDom { let _runtime = RuntimeGuard::new(self.runtime.clone()); // There isn't any more work we can do synchronously. Wait for any new work to be ready - match self.rx.next().await.expect("channel should never close") { - SchedulerMsg::Immediate(id) => self.mark_dirty(id), - SchedulerMsg::TaskNotified(id) => { - // Instead of running the task immediately, we insert it into the runtime's task queue. - // The task may be marked dirty at the same time as the scope that owns the task is dropped. - self.mark_task_dirty(id); - } - }; + self.wait_for_event().await; } } + /// Wait for the next event to trigger and add it to the queue + async fn wait_for_event(&mut self) { + match self.rx.next().await.expect("channel should never close") { + SchedulerMsg::Immediate(id) => self.mark_dirty(id), + SchedulerMsg::TaskNotified(id) => { + // Instead of running the task immediately, we insert it into the runtime's task queue. + // The task may be marked dirty at the same time as the scope that owns the task is dropped. + self.mark_task_dirty(id); + } + }; + } + /// Queue any pending events fn queue_events(&mut self) { // Prevent a task from deadlocking the runtime by repeatedly queueing itself @@ -494,7 +488,6 @@ impl VirtualDom { /// Process all events in the queue until there are no more left #[instrument(skip(self), level = "trace", name = "VirtualDom::process_events")] pub fn process_events(&mut self) { - let _runtime = RuntimeGuard::new(self.runtime.clone()); self.queue_events(); // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures @@ -502,6 +495,14 @@ impl VirtualDom { return; } + self.poll_tasks() + } + + /// Poll any queued tasks + #[instrument(skip(self), level = "trace", name = "VirtualDom::poll_tasks")] + fn poll_tasks(&mut self) { + // Make sure we set the runtime since we're running user code + let _runtime = RuntimeGuard::new(self.runtime.clone()); // Next, run any queued tasks // We choose not to poll the deadline since we complete pretty quickly anyways while let Some(task) = self.dirty_scopes.pop_task() { @@ -617,7 +618,6 @@ impl VirtualDom { { let _runtime = RuntimeGuard::new(self.runtime.clone()); // Then, poll any tasks that might be pending in the scope - // This will run effects, so this **must** be done after the scope is diffed for task in work.tasks { let _ = self.runtime.handle_task_wakeup(task); } @@ -649,15 +649,75 @@ impl VirtualDom { #[instrument(skip(self), level = "trace", name = "VirtualDom::wait_for_suspense")] pub async fn wait_for_suspense(&mut self) { loop { - if self.suspended_scopes.is_empty() { + if self.runtime.suspended_tasks.borrow().is_empty() { break; } // Wait for a work to be ready (IE new suspense leaves to pop up) - self.poll_tasks().await; + 'wait_for_work: loop { + // Process all events - Scopes are marked dirty, etc + // Sometimes when wakers fire we get a slew of updates at once, so its important that we drain this completely + self.queue_events(); + + // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures + if self.dirty_scopes.has_dirty_scopes() { + println!("dirty scopes"); + break; + } + + { + // Make sure we set the runtime since we're running user code + let _runtime = RuntimeGuard::new(self.runtime.clone()); + // Next, run any queued tasks + // We choose not to poll the deadline since we complete pretty quickly anyways + while let Some(task) = self.dirty_scopes.pop_task() { + // If the scope doesn't exist for whatever reason, then we should skip it + if !self.scopes.contains(task.order.id.0) { + continue; + } + + // Then poll any tasks that might be pending + let tasks = task.tasks_queued.into_inner(); + for task in tasks { + if self.runtime.suspended_tasks.borrow().contains(&task) { + let _ = self.runtime.handle_task_wakeup(task); + // Running that task, may mark a scope higher up as dirty. If it does, return from the function early + self.queue_events(); + if self.dirty_scopes.has_dirty_scopes() { + break 'wait_for_work; + } + } + } + } + } + + self.wait_for_event().await; + } // Render whatever work needs to be rendered, unlocking new futures and suspense leaves - self.render_immediate(&mut NoOpMutations); + while let Some(work) = self.dirty_scopes.pop_work() { + // If the scope doesn't exist for whatever reason, then we should skip it + if !self.scopes.contains(work.scope.id.0) { + continue; + } + + { + let _runtime = RuntimeGuard::new(self.runtime.clone()); + // Then, poll any tasks that might be pending in the scope + for task in work.tasks { + // During suspense, we only want to run tasks that are suspended + if self.runtime.suspended_tasks.borrow().contains(&task) { + let _ = self.runtime.handle_task_wakeup(task); + } + } + // If the scope is dirty, run the scope and get the mutations + if work.rerun_scope { + let new_nodes = self.run_scope(work.scope.id); + + self.diff_scope(&mut NoOpMutations, work.scope.id, new_nodes); + } + } + } } } diff --git a/packages/core/tests/suspense.rs b/packages/core/tests/suspense.rs index 2957bc0a8..56de18c92 100644 --- a/packages/core/tests/suspense.rs +++ b/packages/core/tests/suspense.rs @@ -1,9 +1,10 @@ use dioxus::prelude::*; +use std::future::poll_fn; +use std::task::Poll; #[test] fn suspense_resolves() { // wait just a moment, not enough time for the boundary to resolve - tokio::runtime::Builder::new_current_thread() .build() .unwrap() @@ -31,11 +32,35 @@ fn app() -> Element { fn suspended_child() -> Element { let mut val = use_signal(|| 0); + // Tasks that are not suspended should never be polled + spawn(async move { + panic!("Non-suspended task was polled"); + }); + + // // Memos should still work like normal + // let memo = use_memo(move || val * 2); + // assert_eq!(memo, val * 2); + if val() < 3 { - spawn(async move { + let task = spawn(async move { + // Poll each task 3 times + let mut count = 0; + poll_fn(|cx| { + println!("polling... {}", count); + if count < 3 { + count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + }) + .await; + + println!("waiting... {}", val); val += 1; }); - suspend()?; + suspend(task)?; } rsx!("child") From 716eb114264aaa7bc53a0adfaf33a5a8d19e50d7 Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Tue, 5 Mar 2024 18:39:32 -0600 Subject: [PATCH 2/8] update suspend in use_server_future --- packages/fullstack/examples/static-hydrated/src/main.rs | 2 +- packages/fullstack/src/hooks/server_future.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/fullstack/examples/static-hydrated/src/main.rs b/packages/fullstack/examples/static-hydrated/src/main.rs index 3fb981cee..e9d2c77c7 100644 --- a/packages/fullstack/examples/static-hydrated/src/main.rs +++ b/packages/fullstack/examples/static-hydrated/src/main.rs @@ -36,7 +36,7 @@ async fn main() { } // Hydrate the page -#[cfg(all(feature = "web", not(feature = "server")))] +#[cfg(not(feature = "server"))] fn main() { dioxus_web::launch_with_props( dioxus_fullstack::router::RouteWithCfg::, diff --git a/packages/fullstack/src/hooks/server_future.rs b/packages/fullstack/src/hooks/server_future.rs index 8e89a8ca7..63e217c16 100644 --- a/packages/fullstack/src/hooks/server_future.rs +++ b/packages/fullstack/src/hooks/server_future.rs @@ -54,7 +54,7 @@ where // Suspend if the value isn't ready match resource.state().cloned() { UseResourceState::Pending => { - suspend(); + suspend(resource.task()); None } _ => Some(resource), From 492f0329bfd123670da3cf2e2fb7625ddd2115db Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Wed, 6 Mar 2024 11:38:28 -0600 Subject: [PATCH 3/8] remove flume, implement lazier memos --- Cargo.lock | 1 - examples/memo_chain.rs | 6 +- packages/hooks/src/use_effect.rs | 5 +- packages/hooks/src/use_memo.rs | 57 ++------ packages/hooks/src/use_resource.rs | 12 +- packages/signals/Cargo.toml | 1 - packages/signals/src/copy_value.rs | 2 +- packages/signals/src/global/memo.rs | 18 +-- packages/signals/src/global/signal.rs | 2 +- packages/signals/src/impls.rs | 11 ++ packages/signals/src/lib.rs | 3 + packages/signals/src/memo.rs | 173 +++++++++++++++++++++++ packages/signals/src/reactive_context.rs | 110 ++++++-------- packages/signals/src/signal.rs | 41 +----- packages/signals/src/write.rs | 2 +- 15 files changed, 267 insertions(+), 177 deletions(-) create mode 100644 packages/signals/src/memo.rs diff --git a/Cargo.lock b/Cargo.lock index 1d6e8ce04..e6f1c5a57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2754,7 +2754,6 @@ version = "0.5.0-alpha.0" dependencies = [ "dioxus", "dioxus-core", - "flume", "futures-channel", "futures-util", "generational-box", diff --git a/examples/memo_chain.rs b/examples/memo_chain.rs index bbcca9954..745c5d01e 100644 --- a/examples/memo_chain.rs +++ b/examples/memo_chain.rs @@ -26,11 +26,7 @@ fn app() -> Element { } #[component] -fn Child( - state: ReadOnlySignal, - items: ReadOnlySignal>, - depth: ReadOnlySignal, -) -> Element { +fn Child(state: Memo, items: Memo>, depth: ReadOnlySignal) -> Element { if depth() == 0 { return None; } diff --git a/packages/hooks/src/use_effect.rs b/packages/hooks/src/use_effect.rs index 33e65309b..069b58e3d 100644 --- a/packages/hooks/src/use_effect.rs +++ b/packages/hooks/src/use_effect.rs @@ -1,5 +1,6 @@ use dioxus_core::prelude::*; use dioxus_signals::ReactiveContext; +use futures_util::StreamExt; /// `use_effect` will subscribe to any changes in the signal values it captures /// effects will always run after first mount and then whenever the signal values change @@ -26,13 +27,13 @@ pub fn use_effect(mut callback: impl FnMut() + 'static) { use_hook(|| { spawn(async move { - let rc = ReactiveContext::new_with_origin(location); + let (rc, mut changed) = ReactiveContext::new_with_origin(location); loop { // Run the effect rc.run_in(&mut callback); // Wait for context to change - rc.changed().await; + let _ = changed.next().await; // Wait for the dom the be finished with sync work wait_for_next_render().await; diff --git a/packages/hooks/src/use_memo.rs b/packages/hooks/src/use_memo.rs index adbea360f..36a65f83a 100644 --- a/packages/hooks/src/use_memo.rs +++ b/packages/hooks/src/use_memo.rs @@ -1,8 +1,10 @@ use crate::dependency::Dependency; -use crate::use_signal; +use crate::{use_callback, use_signal}; use dioxus_core::prelude::*; +use dioxus_signals::Memo; use dioxus_signals::{ReactiveContext, ReadOnlySignal, Readable, Signal, SignalData}; use dioxus_signals::{Storage, Writable}; +use futures_util::StreamExt; /// Creates a new unsync Selector. The selector will be run immediately and whenever any signal it reads changes. /// @@ -22,51 +24,9 @@ use dioxus_signals::{Storage, Writable}; /// } /// ``` #[track_caller] -pub fn use_memo(f: impl FnMut() -> R + 'static) -> ReadOnlySignal { - use_maybe_sync_memo(f) -} - -/// Creates a new Selector that may be sync. The selector will be run immediately and whenever any signal it reads changes. -/// -/// Selectors can be used to efficiently compute derived data from signals. -/// -/// ```rust -/// use dioxus::prelude::*; -/// use dioxus_signals::*; -/// -/// fn App() -> Element { -/// let mut count = use_signal(|| 0); -/// let double = use_memo(move || count * 2); -/// count += 1; -/// assert_eq!(double(), count * 2); -/// -/// rsx! { "{double}" } -/// } -/// ``` -#[track_caller] -pub fn use_maybe_sync_memo>>( - mut f: impl FnMut() -> R + 'static, -) -> ReadOnlySignal { - use_hook(|| { - // Create a new reactive context for the memo - let rc = ReactiveContext::new(); - - // Create a new signal in that context, wiring up its dependencies and subscribers - let mut state: Signal = rc.run_in(|| Signal::new_maybe_sync(f())); - - spawn(async move { - loop { - rc.changed().await; - let new = rc.run_in(&mut f); - if new != *state.peek() { - *state.write() = new; - } - } - }); - - // And just return the readonly variant of that signal - ReadOnlySignal::new_maybe_sync(state) - }) +pub fn use_memo(f: impl FnMut() -> R + 'static) -> Memo { + let mut callback = use_callback(f); + use_hook(|| Signal::memo(move || callback.call())) } /// Creates a new unsync Selector with some local dependencies. The selector will be run immediately and whenever any signal it reads or any dependencies it tracks changes @@ -127,7 +87,7 @@ where let selector = use_hook(|| { // Get the current reactive context - let rc = ReactiveContext::new(); + let (rc, mut changed) = ReactiveContext::new(); // Create a new signal in that context, wiring up its dependencies and subscribers let mut state: Signal = @@ -135,7 +95,8 @@ where spawn(async move { loop { - rc.changed().await; + // Wait for context to change + let _ = changed.next().await; let new = rc.run_in(|| f(dependencies_signal.read().clone())); if new != *state.peek() { diff --git a/packages/hooks/src/use_resource.rs b/packages/hooks/src/use_resource.rs index 389697d74..4fa39c222 100644 --- a/packages/hooks/src/use_resource.rs +++ b/packages/hooks/src/use_resource.rs @@ -6,8 +6,8 @@ use dioxus_core::{ Task, }; use dioxus_signals::*; -use futures_util::{future, pin_mut, FutureExt}; -use std::future::Future; +use futures_util::{future, pin_mut, FutureExt, StreamExt}; +use std::{cell::Cell, future::Future, rc::Rc}; /// A memo that resolve to a value asynchronously. /// Unlike `use_future`, `use_resource` runs on the **server** @@ -44,7 +44,10 @@ where { let mut value = use_signal(|| None); let mut state = use_signal(|| UseResourceState::Pending); - let rc = use_hook(ReactiveContext::new); + let (rc, changed) = use_hook(|| { + let (rc, changed) = ReactiveContext::new(); + (rc, Rc::new(Cell::new(Some(changed)))) + }); let mut cb = use_callback(move || { // Create the user's task @@ -70,10 +73,11 @@ where let mut task = use_hook(|| Signal::new(cb.call())); use_hook(|| { + let mut changed = changed.take().unwrap(); spawn(async move { loop { // Wait for the dependencies to change - rc.changed().await; + let _ = changed.next().await; // Stop the old task task.write().cancel(); diff --git a/packages/signals/Cargo.toml b/packages/signals/Cargo.toml index b8b29c2c5..a4a94393f 100644 --- a/packages/signals/Cargo.toml +++ b/packages/signals/Cargo.toml @@ -22,7 +22,6 @@ once_cell = "1.18.0" rustc-hash = { workspace = true } futures-channel = { workspace = true } futures-util = { workspace = true } -flume = { version = "0.11.0", default-features = false, features = ["async"] } [dev-dependencies] dioxus = { workspace = true } diff --git a/packages/signals/src/copy_value.rs b/packages/signals/src/copy_value.rs index ca318bf2c..fb984eb6f 100644 --- a/packages/signals/src/copy_value.rs +++ b/packages/signals/src/copy_value.rs @@ -237,7 +237,7 @@ impl> Writable for CopyValue { S::try_map_mut(mut_, f) } - fn try_write(&self) -> Result, generational_box::BorrowMutError> { + fn try_write(&mut self) -> Result, generational_box::BorrowMutError> { self.value.try_write() } diff --git a/packages/signals/src/global/memo.rs b/packages/signals/src/global/memo.rs index 4a94d29f6..61d1c10f4 100644 --- a/packages/signals/src/global/memo.rs +++ b/packages/signals/src/global/memo.rs @@ -1,9 +1,9 @@ -use crate::{read::Readable, ReadableRef}; +use crate::{read::Readable, Memo, ReadableRef}; use dioxus_core::prelude::{IntoAttributeValue, ScopeId}; use generational_box::UnsyncStorage; use std::{mem::MaybeUninit, ops::Deref}; -use crate::{ReadOnlySignal, Signal}; +use crate::Signal; use super::get_global_context; @@ -22,14 +22,14 @@ impl GlobalMemo { } /// Get the signal that backs this global. - pub fn signal(&self) -> ReadOnlySignal { + pub fn memo(&self) -> Memo { let key = self as *const _ as *const (); let context = get_global_context(); let read = context.signal.borrow(); match read.get(&key) { - Some(signal) => *signal.downcast_ref::>().unwrap(), + Some(signal) => *signal.downcast_ref::>().unwrap(), None => { drop(read); // Constructors are always run in the root scope @@ -47,7 +47,7 @@ impl GlobalMemo { /// Get the generational id of the signal. pub fn id(&self) -> generational_box::GenerationalBoxId { - self.signal().id() + self.memo().id() } } @@ -57,12 +57,12 @@ impl Readable for GlobalMemo { #[track_caller] fn try_read(&self) -> Result, generational_box::BorrowError> { - self.signal().try_read() + self.memo().try_read() } #[track_caller] fn peek(&self) -> ReadableRef { - self.signal().peek() + self.memo().peek() } } @@ -71,7 +71,7 @@ where T: Clone + IntoAttributeValue, { fn into_value(self) -> dioxus_core::AttributeValue { - self.signal().into_value() + self.memo().into_value() } } @@ -81,7 +81,7 @@ impl PartialEq for GlobalMemo { } } -/// Allow calling a signal with signal() syntax +/// Allow calling a signal with memo() syntax /// /// Currently only limited to copy types, though could probably specialize for string/arc/rc impl Deref for GlobalMemo { diff --git a/packages/signals/src/global/signal.rs b/packages/signals/src/global/signal.rs index dda337232..3dd57f484 100644 --- a/packages/signals/src/global/signal.rs +++ b/packages/signals/src/global/signal.rs @@ -103,7 +103,7 @@ impl Writable for GlobalSignal { } #[track_caller] - fn try_write(&self) -> Result, generational_box::BorrowMutError> { + fn try_write(&mut self) -> Result, generational_box::BorrowMutError> { self.signal().try_write() } } diff --git a/packages/signals/src/impls.rs b/packages/signals/src/impls.rs index 3d120b54e..806a1fe1a 100644 --- a/packages/signals/src/impls.rs +++ b/packages/signals/src/impls.rs @@ -1,4 +1,5 @@ use crate::copy_value::CopyValue; +use crate::memo::Memo; use crate::read::Readable; use crate::signal::Signal; use crate::write::Writable; @@ -159,6 +160,16 @@ impl>> Clone for ReadOnlySignal { impl>> Copy for ReadOnlySignal {} +read_impls!(Memo: PartialEq); + +impl Clone for Memo { + fn clone(&self) -> Self { + *self + } +} + +impl Copy for Memo {} + read_impls!(GlobalSignal); default_impl!(GlobalSignal); diff --git a/packages/signals/src/lib.rs b/packages/signals/src/lib.rs index 46953ec5a..1492fe205 100644 --- a/packages/signals/src/lib.rs +++ b/packages/signals/src/lib.rs @@ -19,6 +19,9 @@ pub use map::*; // mod comparer; // pub use comparer::*; +mod memo; +pub use memo::*; + mod global; pub use global::*; diff --git a/packages/signals/src/memo.rs b/packages/signals/src/memo.rs new file mode 100644 index 000000000..e8ad0ec44 --- /dev/null +++ b/packages/signals/src/memo.rs @@ -0,0 +1,173 @@ +use crate::write::Writable; +use crate::{read::Readable, ReactiveContext, ReadableRef, Signal}; +use crate::{CopyValue, ReadOnlySignal}; +use std::{ + cell::RefCell, + ops::Deref, + panic::Location, + sync::{atomic::AtomicBool, Arc}, +}; + +use dioxus_core::prelude::*; +use futures_util::StreamExt; +use generational_box::UnsyncStorage; +struct UpdateInformation { + dirty: Arc, + callback: RefCell T>>, +} + +/// A value that is memoized. This is useful for caching the result of a computation. +pub struct Memo { + inner: Signal, + update: CopyValue>, +} + +impl From> for ReadOnlySignal +where + T: PartialEq, +{ + fn from(val: Memo) -> Self { + ReadOnlySignal::new(val.inner) + } +} + +impl Memo { + /// Create a new memo + #[track_caller] + pub fn new(mut f: impl FnMut() -> T + 'static) -> Self + where + T: PartialEq, + { + let dirty = Arc::new(AtomicBool::new(true)); + let (tx, mut rx) = futures_channel::mpsc::unbounded(); + + let callback = { + let dirty = dirty.clone(); + move || { + dirty.store(true, std::sync::atomic::Ordering::Relaxed); + tx.unbounded_send(()).unwrap(); + } + }; + let rc = ReactiveContext::new_with_callback( + callback, + current_scope_id().unwrap(), + Location::caller(), + ); + + // Create a new signal in that context, wiring up its dependencies and subscribers + let value = rc.run_in(&mut f); + let recompute = RefCell::new(Box::new(f) as Box T>); + let update = CopyValue::new(UpdateInformation { + dirty, + callback: recompute, + }); + let state: Signal = Signal::new(value); + + let memo = Memo { + inner: state, + update, + }; + + spawn(async move { + while rx.next().await.is_some() { + memo.recompute(); + } + }); + + memo + } + + /// Rerun the computation and update the value of the memo if the result has changed. + fn recompute(&self) + where + T: PartialEq, + { + let mut update_copy = self.update; + let update_write = update_copy.write(); + let peak = self.inner.peek(); + let new_value = (update_write.callback.borrow_mut())(); + if new_value != *peak { + drop(peak); + let mut copy = self.inner; + let mut write = copy.write(); + *write = new_value; + update_write + .dirty + .store(false, std::sync::atomic::Ordering::Relaxed); + } + } + + /// Get the scope that the signal was created in. + pub fn origin_scope(&self) -> ScopeId { + self.inner.origin_scope() + } + + /// Get the id of the signal. + pub fn id(&self) -> generational_box::GenerationalBoxId { + self.inner.id() + } +} + +impl Readable for Memo +where + T: PartialEq, +{ + type Target = T; + type Storage = UnsyncStorage; + + #[track_caller] + fn try_read(&self) -> Result, generational_box::BorrowError> { + let read = self.inner.try_read(); + match read { + Ok(r) => { + let needs_update = self + .update + .read() + .dirty + .swap(false, std::sync::atomic::Ordering::Relaxed); + if needs_update { + drop(r); + self.recompute(); + self.inner.try_read() + } else { + Ok(r) + } + } + Err(e) => Err(e), + } + } + + /// Get the current value of the signal. **Unlike read, this will not subscribe the current scope to the signal which can cause parts of your UI to not update.** + /// + /// If the signal has been dropped, this will panic. + #[track_caller] + fn peek(&self) -> ReadableRef { + self.inner.peek() + } +} + +impl IntoAttributeValue for Memo +where + T: Clone + IntoAttributeValue + PartialEq, +{ + fn into_value(self) -> dioxus_core::AttributeValue { + self.with(|f| f.clone().into_value()) + } +} + +impl PartialEq for Memo { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner + } +} + +impl Deref for Memo +where + T: PartialEq, +{ + type Target = dyn Fn() -> T; + + fn deref(&self) -> &Self::Target { + Readable::deref_impl(self) + } +} diff --git a/packages/signals/src/reactive_context.rs b/packages/signals/src/reactive_context.rs index 98b611bb1..86eebb687 100644 --- a/packages/signals/src/reactive_context.rs +++ b/packages/signals/src/reactive_context.rs @@ -1,9 +1,9 @@ use dioxus_core::prelude::{ current_scope_id, has_context, provide_context, schedule_update_any, ScopeId, }; +use futures_channel::mpsc::UnboundedReceiver; use generational_box::SyncStorage; -use rustc_hash::FxHashSet; -use std::{cell::RefCell, hash::Hash, sync::Arc}; +use std::{cell::RefCell, hash::Hash}; use crate::{CopyValue, Readable, Writable}; @@ -25,66 +25,48 @@ thread_local! { impl std::fmt::Display for ReactiveContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let read = self.inner.read(); - match read.scope_subscriber { - Some(scope) => write!(f, "ReactiveContext for scope {:?}", scope), - None => { - #[cfg(debug_assertions)] - return write!(f, "ReactiveContext created at {}", read.origin); - #[cfg(not(debug_assertions))] - write!(f, "ReactiveContext") - } - } - } -} - -impl Default for ReactiveContext { - #[track_caller] - fn default() -> Self { - Self::new_for_scope(None, std::panic::Location::caller()) + #[cfg(debug_assertions)] + return write!(f, "ReactiveContext created at {}", read.origin); + #[cfg(not(debug_assertions))] + write!(f, "ReactiveContext") } } impl ReactiveContext { /// Create a new reactive context #[track_caller] - pub fn new() -> Self { - Self::default() + pub fn new() -> (Self, UnboundedReceiver<()>) { + Self::new_with_origin(std::panic::Location::caller()) } /// Create a new reactive context with a location for debugging purposes /// This is useful for reactive contexts created within closures - pub fn new_with_origin(origin: &'static std::panic::Location<'static>) -> Self { - Self::new_for_scope(None, origin) + pub fn new_with_origin( + origin: &'static std::panic::Location<'static>, + ) -> (Self, UnboundedReceiver<()>) { + let (tx, rx) = futures_channel::mpsc::unbounded(); + let callback = move || { + let _ = tx.unbounded_send(()); + }; + let _self = Self::new_with_callback(callback, current_scope_id().unwrap(), origin); + (_self, rx) } - /// Create a new reactive context that may update a scope - #[allow(unused)] - pub(crate) fn new_for_scope( - scope: Option, + /// Create a new reactive context that may update a scope. When any signal that this context subscribes to changes, the callback will be run + pub fn new_with_callback( + callback: impl FnMut() + Send + Sync + 'static, + scope: ScopeId, origin: &'static std::panic::Location<'static>, ) -> Self { - let (tx, rx) = flume::unbounded(); - - let mut scope_subscribers = FxHashSet::default(); - if let Some(scope) = scope { - scope_subscribers.insert(scope); - } - let inner = Inner { - scope_subscriber: scope, - sender: tx, self_: None, - update_any: schedule_update_any(), - receiver: rx, + update: Box::new(callback), #[cfg(debug_assertions)] origin, }; let mut self_ = Self { - inner: CopyValue::new_maybe_sync_in_scope( - inner, - scope.or_else(current_scope_id).unwrap(), - ), + inner: CopyValue::new_maybe_sync_in_scope(inner, scope), }; self_.inner.write().self_ = Some(self_); @@ -112,10 +94,17 @@ impl ReactiveContext { if let Some(cx) = has_context() { return Some(cx); } + let update_any = schedule_update_any(); + let scope_id = current_scope_id().unwrap(); + let update_scope = move || { + tracing::trace!("Marking scope {:?} as dirty", scope_id); + update_any(scope_id) + }; // Otherwise, create a new context at the current scope - Some(provide_context(ReactiveContext::new_for_scope( - current_scope_id(), + Some(provide_context(ReactiveContext::new_with_callback( + update_scope, + scope_id, std::panic::Location::caller(), ))) } @@ -137,25 +126,18 @@ impl ReactiveContext { /// /// Returns true if the context was marked as dirty, or false if the context has been dropped pub fn mark_dirty(&self) -> bool { - if let Ok(self_read) = self.inner.try_read() { + let mut copy = self.inner; + if let Ok(mut self_write) = copy.try_write() { #[cfg(debug_assertions)] { - if let Some(scope) = self_read.scope_subscriber { - tracing::trace!("Marking reactive context for scope {:?} as dirty", scope); - } else { - tracing::trace!( - "Marking reactive context created at {} as dirty", - self_read.origin - ); - } - } - if let Some(scope) = self_read.scope_subscriber { - (self_read.update_any)(scope); + tracing::trace!( + "Marking reactive context created at {} as dirty", + self_write.origin + ); } - // mark the listeners as dirty - // If the channel is full it means that the receivers have already been marked as dirty - _ = self_read.sender.try_send(()); + (self_write.update)(); + true } else { false @@ -166,12 +148,6 @@ impl ReactiveContext { pub fn origin_scope(&self) -> ScopeId { self.inner.origin_scope() } - - /// Wait for this reactive context to change - pub async fn changed(&self) { - let rx = self.inner.read().receiver.clone(); - _ = rx.recv_async().await; - } } impl Hash for ReactiveContext { @@ -181,14 +157,10 @@ impl Hash for ReactiveContext { } struct Inner { - // A scope we mark as dirty when this context is written to - scope_subscriber: Option, self_: Option, - update_any: Arc, // Futures will call .changed().await - sender: flume::Sender<()>, - receiver: flume::Receiver<()>, + update: Box, // Debug information for signal subscriptions #[cfg(debug_assertions)] diff --git a/packages/signals/src/signal.rs b/packages/signals/src/signal.rs index 68e851c4d..cdf8595bf 100644 --- a/packages/signals/src/signal.rs +++ b/packages/signals/src/signal.rs @@ -1,11 +1,9 @@ +use crate::Memo; use crate::{ read::Readable, write::Writable, CopyValue, GlobalMemo, GlobalSignal, ReactiveContext, - ReadOnlySignal, ReadableRef, -}; -use dioxus_core::{ - prelude::{spawn, IntoAttributeValue}, - ScopeId, + ReadableRef, }; +use dioxus_core::{prelude::IntoAttributeValue, ScopeId}; use generational_box::{AnyStorage, Storage, SyncStorage, UnsyncStorage}; use std::{ any::Any, @@ -88,35 +86,8 @@ impl Signal { /// /// Selectors can be used to efficiently compute derived data from signals. #[track_caller] - pub fn memo(f: impl FnMut() -> T + 'static) -> ReadOnlySignal { - Self::use_maybe_sync_memo(f) - } - - /// Creates a new Selector that may be Sync + Send. The selector will be run immediately and whenever any signal it reads changes. - /// - /// Selectors can be used to efficiently compute derived data from signals. - #[track_caller] - pub fn use_maybe_sync_memo>>( - mut f: impl FnMut() -> T + 'static, - ) -> ReadOnlySignal { - // Get the current reactive context - let rc = ReactiveContext::new(); - - // Create a new signal in that context, wiring up its dependencies and subscribers - let mut state: Signal = rc.run_in(|| Signal::new_maybe_sync(f())); - - spawn(async move { - loop { - rc.changed().await; - let new = f(); - if new != *state.peek() { - *state.write() = new; - } - } - }); - - // And just return the readonly variant of that signal - ReadOnlySignal::new_maybe_sync(state) + pub fn memo(f: impl FnMut() -> T + 'static) -> Memo { + Memo::new(f) } } @@ -237,7 +208,7 @@ impl>> Writable for Signal { } #[track_caller] - fn try_write(&self) -> Result, generational_box::BorrowMutError> { + fn try_write(&mut self) -> Result, generational_box::BorrowMutError> { self.inner.try_write().map(|inner| { let borrow = S::map_mut(inner, |v| &mut v.value); Write { diff --git a/packages/signals/src/write.rs b/packages/signals/src/write.rs index 4cba48c62..8abc54b44 100644 --- a/packages/signals/src/write.rs +++ b/packages/signals/src/write.rs @@ -27,7 +27,7 @@ pub trait Writable: Readable { } /// Try to get a mutable reference to the value. If the value has been dropped, this will panic. - fn try_write(&self) -> Result, generational_box::BorrowMutError>; + fn try_write(&mut self) -> Result, generational_box::BorrowMutError>; /// Run a function with a mutable reference to the value. If the value has been dropped, this will panic. #[track_caller] From da4d9c70e8dc81bfa234052255ded5a851924a58 Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Wed, 6 Mar 2024 11:42:31 -0600 Subject: [PATCH 4/8] fix memo chain example --- examples/memo_chain.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/memo_chain.rs b/examples/memo_chain.rs index 745c5d01e..2be2b88fb 100644 --- a/examples/memo_chain.rs +++ b/examples/memo_chain.rs @@ -21,25 +21,25 @@ fn app() -> Element { button { onclick: move |_| value += 1, "Increment" } button { onclick: move |_| depth += 1, "Add depth" } button { onclick: move |_| depth -= 1, "Remove depth" } - Child { depth, items, state } + if depth() > 0 { + Child { depth, items, state } + } } } #[component] fn Child(state: Memo, items: Memo>, depth: ReadOnlySignal) -> Element { - if depth() == 0 { - return None; - } - // These memos don't get re-computed when early returns happen let state = use_memo(move || state() + 1); - let item = use_memo(move || items()[depth()]); + let item = use_memo(move || items()[depth() - 1]); let depth = use_memo(move || depth() - 1); println!("rendering child: {}", depth()); rsx! { h3 { "Depth({depth})-Item({item}): {state}"} - Child { depth, state, items } + if depth() > 0 { + Child { depth, state, items } + } } } From 3d7f419636c989b6688c1d7d914cab10a63b54bb Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Thu, 7 Mar 2024 11:49:51 -0600 Subject: [PATCH 5/8] fix memos during suspense --- packages/core/tests/suspense.rs | 6 +-- packages/core/tests/task.rs | 4 +- packages/signals/src/copy_value.rs | 3 ++ packages/signals/src/memo.rs | 61 +++++++++++++++++++++--- packages/signals/src/reactive_context.rs | 8 ++-- packages/signals/src/signal.rs | 6 ++- 6 files changed, 71 insertions(+), 17 deletions(-) diff --git a/packages/core/tests/suspense.rs b/packages/core/tests/suspense.rs index 56de18c92..413ecdb5f 100644 --- a/packages/core/tests/suspense.rs +++ b/packages/core/tests/suspense.rs @@ -37,9 +37,9 @@ fn suspended_child() -> Element { panic!("Non-suspended task was polled"); }); - // // Memos should still work like normal - // let memo = use_memo(move || val * 2); - // assert_eq!(memo, val * 2); + // Memos should still work like normal + let memo = use_memo(move || val * 2); + assert_eq!(memo, val * 2); if val() < 3 { let task = spawn(async move { diff --git a/packages/core/tests/task.rs b/packages/core/tests/task.rs index f82b5473d..aa4cd8465 100644 --- a/packages/core/tests/task.rs +++ b/packages/core/tests/task.rs @@ -53,7 +53,7 @@ async fn running_async() { #[tokio::test] async fn yield_now_works() { thread_local! { - static SEQUENCE: std::cell::RefCell> = std::cell::RefCell::new(Vec::new()); + static SEQUENCE: std::cell::RefCell> = const { std::cell::RefCell::new(Vec::new()) }; } fn app() -> Element { @@ -88,7 +88,7 @@ async fn yield_now_works() { #[tokio::test] async fn flushing() { thread_local! { - static SEQUENCE: std::cell::RefCell> = std::cell::RefCell::new(Vec::new()); + static SEQUENCE: std::cell::RefCell> = const { std::cell::RefCell::new(Vec::new()) }; static BROADCAST: (tokio::sync::broadcast::Sender<()>, tokio::sync::broadcast::Receiver<()>) = tokio::sync::broadcast::channel(1); } diff --git a/packages/signals/src/copy_value.rs b/packages/signals/src/copy_value.rs index fb984eb6f..cb37d980f 100644 --- a/packages/signals/src/copy_value.rs +++ b/packages/signals/src/copy_value.rs @@ -237,14 +237,17 @@ impl> Writable for CopyValue { S::try_map_mut(mut_, f) } + #[track_caller] fn try_write(&mut self) -> Result, generational_box::BorrowMutError> { self.value.try_write() } + #[track_caller] fn write(&mut self) -> Self::Mut { self.value.write() } + #[track_caller] fn set(&mut self, value: T) { self.value.set(value); } diff --git a/packages/signals/src/memo.rs b/packages/signals/src/memo.rs index e8ad0ec44..e5761eb58 100644 --- a/packages/signals/src/memo.rs +++ b/packages/signals/src/memo.rs @@ -1,6 +1,7 @@ use crate::write::Writable; use crate::{read::Readable, ReactiveContext, ReadableRef, Signal}; use crate::{CopyValue, ReadOnlySignal}; +use std::rc::Rc; use std::{ cell::RefCell, ops::Deref, @@ -11,6 +12,33 @@ use std::{ use dioxus_core::prelude::*; use futures_util::StreamExt; use generational_box::UnsyncStorage; +use once_cell::sync::OnceCell; + +/// A thread local that can only be read from the thread it was created on. +pub struct ThreadLocal { + value: T, + owner: std::thread::ThreadId, +} + +impl ThreadLocal { + /// Create a new thread local. + pub fn new(value: T) -> Self { + ThreadLocal { + value, + owner: std::thread::current().id(), + } + } + + /// Get the value of the thread local. + pub fn get(&self) -> Option<&T> { + (self.owner == std::thread::current().id()).then_some(&self.value) + } +} + +// SAFETY: This is safe because the thread local can only be read from the thread it was created on. +unsafe impl Send for ThreadLocal {} +unsafe impl Sync for ThreadLocal {} + struct UpdateInformation { dirty: Arc, callback: RefCell T>>, @@ -41,11 +69,26 @@ impl Memo { let dirty = Arc::new(AtomicBool::new(true)); let (tx, mut rx) = futures_channel::mpsc::unbounded(); + let myself: Rc>> = Rc::new(OnceCell::new()); + let thread_local = ThreadLocal::new(myself.clone()); + let callback = { let dirty = dirty.clone(); - move || { - dirty.store(true, std::sync::atomic::Ordering::Relaxed); - tx.unbounded_send(()).unwrap(); + move || match thread_local.get() { + Some(memo) => match memo.get() { + Some(memo) => { + memo.recompute(); + } + None => { + tracing::error!("Memo was not initialized in the same thread it was created in. This is likely a bug in dioxus"); + dirty.store(true, std::sync::atomic::Ordering::Relaxed); + let _ = tx.unbounded_send(()); + } + }, + None => { + dirty.store(true, std::sync::atomic::Ordering::Relaxed); + let _ = tx.unbounded_send(()); + } } }; let rc = ReactiveContext::new_with_callback( @@ -55,8 +98,9 @@ impl Memo { ); // Create a new signal in that context, wiring up its dependencies and subscribers - let value = rc.run_in(&mut f); - let recompute = RefCell::new(Box::new(f) as Box T>); + let mut recompute = move || rc.run_in(&mut f); + let value = recompute(); + let recompute = RefCell::new(Box::new(recompute) as Box T>); let update = CopyValue::new(UpdateInformation { dirty, callback: recompute, @@ -67,9 +111,12 @@ impl Memo { inner: state, update, }; + let _ = myself.set(memo); spawn(async move { while rx.next().await.is_some() { + // Remove any pending updates + while rx.try_next().is_ok() {} memo.recompute(); } }); @@ -78,6 +125,7 @@ impl Memo { } /// Rerun the computation and update the value of the memo if the result has changed. + #[tracing::instrument(skip(self))] fn recompute(&self) where T: PartialEq, @@ -89,8 +137,7 @@ impl Memo { if new_value != *peak { drop(peak); let mut copy = self.inner; - let mut write = copy.write(); - *write = new_value; + copy.set(new_value); update_write .dirty .store(false, std::sync::atomic::Ordering::Relaxed); diff --git a/packages/signals/src/reactive_context.rs b/packages/signals/src/reactive_context.rs index 86eebb687..a1b08db24 100644 --- a/packages/signals/src/reactive_context.rs +++ b/packages/signals/src/reactive_context.rs @@ -24,10 +24,12 @@ thread_local! { impl std::fmt::Display for ReactiveContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let read = self.inner.read(); #[cfg(debug_assertions)] - return write!(f, "ReactiveContext created at {}", read.origin); - #[cfg(not(debug_assertions))] + { + if let Ok(read) = self.inner.try_read() { + return write!(f, "ReactiveContext created at {}", read.origin); + } + } write!(f, "ReactiveContext") } } diff --git a/packages/signals/src/signal.rs b/packages/signals/src/signal.rs index cdf8595bf..80fb7953d 100644 --- a/packages/signals/src/signal.rs +++ b/packages/signals/src/signal.rs @@ -150,8 +150,10 @@ impl>> Signal { { let inner = self.inner.read(); - let mut subscribers = inner.subscribers.lock().unwrap(); - subscribers.retain(|reactive_context| reactive_context.mark_dirty()) + // We cannot hold the subscribers lock while calling mark_dirty, because mark_dirty can run user code which may cause a new subscriber to be added. If we hold the lock, we will deadlock. + let mut subscribers = std::mem::take(&mut *inner.subscribers.lock().unwrap()); + subscribers.retain(|reactive_context| reactive_context.mark_dirty()); + *inner.subscribers.lock().unwrap() = subscribers; } } From e72bf5820fe6edfd8c1dbed6089b066472b82edc Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Thu, 7 Mar 2024 20:47:43 -0600 Subject: [PATCH 6/8] reuse remove_task in handle_task_wakeup --- packages/core/src/tasks.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/core/src/tasks.rs b/packages/core/src/tasks.rs index 2335afac3..a700b4503 100644 --- a/packages/core/src/tasks.rs +++ b/packages/core/src/tasks.rs @@ -171,11 +171,7 @@ impl Runtime { .borrow_mut() .remove(&id); - // Remove it from the scheduler - self.tasks.borrow_mut().try_remove(id.0); - - // Remove it from the suspended tasks - self.suspended_tasks.borrow_mut().remove(&id); + self.remove_task(id); } // Remove the scope from the stack From cca92b9fed98efe17149ce024dd7652b9d92188d Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Thu, 7 Mar 2024 20:51:19 -0600 Subject: [PATCH 7/8] remove extra logging --- packages/core/src/virtual_dom.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/core/src/virtual_dom.rs b/packages/core/src/virtual_dom.rs index dec5b261a..fc86b70bc 100644 --- a/packages/core/src/virtual_dom.rs +++ b/packages/core/src/virtual_dom.rs @@ -661,7 +661,6 @@ impl VirtualDom { // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures if self.dirty_scopes.has_dirty_scopes() { - println!("dirty scopes"); break; } From 9bad802beb9e99f28a461b18391d502fd9cef6cd Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Thu, 7 Mar 2024 21:04:48 -0600 Subject: [PATCH 8/8] clean up dirty scopes module --- packages/core/src/dirty_scope.rs | 77 +++++++++++------- packages/core/src/virtual_dom.rs | 79 ++++++++----------- .../examples/static-hydrated/src/main.rs | 2 +- 3 files changed, 79 insertions(+), 79 deletions(-) diff --git a/packages/core/src/dirty_scope.rs b/packages/core/src/dirty_scope.rs index 6e2d71192..4cfb187ef 100644 --- a/packages/core/src/dirty_scope.rs +++ b/packages/core/src/dirty_scope.rs @@ -29,9 +29,9 @@ use crate::ScopeId; use crate::Task; +use crate::VirtualDom; use std::borrow::Borrow; use std::cell::RefCell; -use std::collections::BTreeSet; use std::hash::Hash; #[derive(Debug, Clone, Copy, Eq)] @@ -70,50 +70,71 @@ impl Hash for ScopeOrder { } } -#[derive(Debug, Default)] -pub struct DirtyScopes { - pub(crate) scopes: BTreeSet, - pub(crate) tasks: BTreeSet, -} - -impl DirtyScopes { +impl VirtualDom { /// Queue a task to be polled - pub fn queue_task(&mut self, task: Task, order: ScopeOrder) { - match self.tasks.get(&order) { + pub(crate) fn queue_task(&mut self, task: Task, order: ScopeOrder) { + match self.dirty_tasks.get(&order) { Some(scope) => scope.queue_task(task), None => { let scope = DirtyTasks::from(order); scope.queue_task(task); - self.tasks.insert(scope); + self.dirty_tasks.insert(scope); } } } /// Queue a scope to be rerendered - pub fn queue_scope(&mut self, order: ScopeOrder) { - self.scopes.insert(order); + pub(crate) fn queue_scope(&mut self, order: ScopeOrder) { + self.dirty_scopes.insert(order); } /// Check if there are any dirty scopes - pub fn has_dirty_scopes(&self) -> bool { - !self.scopes.is_empty() + pub(crate) fn has_dirty_scopes(&self) -> bool { + !self.dirty_scopes.is_empty() } /// Take any tasks from the highest scope - pub fn pop_task(&mut self) -> Option { - self.tasks.pop_first() + pub(crate) fn pop_task(&mut self) -> Option { + let mut task = self.dirty_tasks.pop_first()?; + + // If the scope doesn't exist for whatever reason, then we should skip it + while !self.scopes.contains(task.order.id.0) { + task = self.dirty_tasks.pop_first()?; + } + + Some(task) } /// Take any work from the highest scope. This may include rerunning the scope and/or running tasks - pub fn pop_work(&mut self) -> Option { - let dirty_scope = self.scopes.first(); - let dirty_task = self.tasks.first(); + pub(crate) fn pop_work(&mut self) -> Option { + let mut dirty_scope = self.dirty_scopes.first(); + // Pop any invalid scopes off of each dirty task; + while let Some(scope) = dirty_scope { + if !self.scopes.contains(scope.id.0) { + self.dirty_scopes.pop_first(); + dirty_scope = self.dirty_scopes.first(); + } else { + break; + } + } + + let mut dirty_task = self.dirty_tasks.first(); + // Pop any invalid tasks off of each dirty scope; + while let Some(task) = dirty_task { + if !self.scopes.contains(task.order.id.0) { + self.dirty_tasks.pop_first(); + dirty_task = self.dirty_tasks.first(); + } else { + break; + } + } + match (dirty_scope, dirty_task) { (Some(scope), Some(task)) => { let tasks_order = task.borrow(); match scope.cmp(tasks_order) { std::cmp::Ordering::Less => { - let scope = self.scopes.pop_first().unwrap(); + let scope = self.dirty_scopes.pop_first().unwrap(); Some(Work { scope, rerun_scope: true, @@ -121,7 +142,7 @@ impl DirtyScopes { }) } std::cmp::Ordering::Greater => { - let task = self.tasks.pop_first().unwrap(); + let task = self.dirty_tasks.pop_first().unwrap(); Some(Work { scope: task.order, rerun_scope: false, @@ -129,8 +150,8 @@ impl DirtyScopes { }) } std::cmp::Ordering::Equal => { - let scope = self.scopes.pop_first().unwrap(); - let task = self.tasks.pop_first().unwrap(); + let scope = self.dirty_scopes.pop_first().unwrap(); + let task = self.dirty_tasks.pop_first().unwrap(); Some(Work { scope, rerun_scope: true, @@ -140,7 +161,7 @@ impl DirtyScopes { } } (Some(_), None) => { - let scope = self.scopes.pop_first().unwrap(); + let scope = self.dirty_scopes.pop_first().unwrap(); Some(Work { scope, rerun_scope: true, @@ -148,7 +169,7 @@ impl DirtyScopes { }) } (None, Some(_)) => { - let task = self.tasks.pop_first().unwrap(); + let task = self.dirty_tasks.pop_first().unwrap(); Some(Work { scope: task.order, rerun_scope: false, @@ -158,10 +179,6 @@ impl DirtyScopes { (None, None) => None, } } - - pub fn remove(&mut self, scope: &ScopeOrder) { - self.scopes.remove(scope); - } } #[derive(Debug)] diff --git a/packages/core/src/virtual_dom.rs b/packages/core/src/virtual_dom.rs index fc86b70bc..28a237204 100644 --- a/packages/core/src/virtual_dom.rs +++ b/packages/core/src/virtual_dom.rs @@ -2,14 +2,14 @@ //! //! This module provides the primary mechanics to create a hook-based, concurrent VDOM for Rust. -use crate::innerlude::ScopeOrder; +use crate::innerlude::{DirtyTasks, ScopeOrder}; use crate::Task; use crate::{ any_props::AnyProps, arena::ElementId, innerlude::{ - DirtyScopes, ElementRef, ErrorBoundary, NoOpMutations, SchedulerMsg, ScopeState, - VNodeMount, VProps, WriteMutations, + ElementRef, ErrorBoundary, NoOpMutations, SchedulerMsg, ScopeState, VNodeMount, VProps, + WriteMutations, }, nodes::RenderReturn, nodes::{Template, TemplateId}, @@ -20,6 +20,7 @@ use crate::{ use futures_util::StreamExt; use rustc_hash::FxHashMap; use slab::Slab; +use std::collections::BTreeSet; use std::{any::Any, rc::Rc}; use tracing::instrument; @@ -185,7 +186,8 @@ use tracing::instrument; pub struct VirtualDom { pub(crate) scopes: Slab, - pub(crate) dirty_scopes: DirtyScopes, + pub(crate) dirty_scopes: BTreeSet, + pub(crate) dirty_tasks: BTreeSet, // Maps a template path to a map of byte indexes to templates pub(crate) templates: FxHashMap>, @@ -312,6 +314,7 @@ impl VirtualDom { runtime: Runtime::new(tx), scopes: Default::default(), dirty_scopes: Default::default(), + dirty_tasks: Default::default(), templates: Default::default(), queued_templates: Default::default(), elements: Default::default(), @@ -376,7 +379,8 @@ impl VirtualDom { tracing::event!(tracing::Level::TRACE, "Marking scope {:?} as dirty", id); let order = ScopeOrder::new(scope.height(), id); - self.dirty_scopes.queue_scope(order); + drop(scope); + self.queue_scope(order); } /// Mark a task as dirty @@ -396,7 +400,8 @@ impl VirtualDom { ); let order = ScopeOrder::new(scope.height(), scope.id); - self.dirty_scopes.queue_task(task, order); + drop(scope); + self.queue_task(task, order); } /// Call a listener inside the VirtualDom with data from outside the VirtualDom. **The ElementId passed in must be the id of an element with a listener, not a static node or a text node.** @@ -450,7 +455,7 @@ impl VirtualDom { self.process_events(); // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures - if self.dirty_scopes.has_dirty_scopes() { + if self.has_dirty_scopes() { return; } @@ -491,7 +496,7 @@ impl VirtualDom { self.queue_events(); // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures - if self.dirty_scopes.has_dirty_scopes() { + if self.has_dirty_scopes() { return; } @@ -505,19 +510,14 @@ impl VirtualDom { let _runtime = RuntimeGuard::new(self.runtime.clone()); // Next, run any queued tasks // We choose not to poll the deadline since we complete pretty quickly anyways - while let Some(task) = self.dirty_scopes.pop_task() { - // If the scope doesn't exist for whatever reason, then we should skip it - if !self.scopes.contains(task.order.id.0) { - continue; - } - + while let Some(task) = self.pop_task() { // Then poll any tasks that might be pending let tasks = task.tasks_queued.into_inner(); for task in tasks { let _ = self.runtime.handle_task_wakeup(task); // Running that task, may mark a scope higher up as dirty. If it does, return from the function early self.queue_events(); - if self.dirty_scopes.has_dirty_scopes() { + if self.has_dirty_scopes() { return; } } @@ -609,12 +609,7 @@ impl VirtualDom { // Next, diff any dirty scopes // We choose not to poll the deadline since we complete pretty quickly anyways - while let Some(work) = self.dirty_scopes.pop_work() { - // If the scope doesn't exist for whatever reason, then we should skip it - if !self.scopes.contains(work.scope.id.0) { - continue; - } - + while let Some(work) = self.pop_work() { { let _runtime = RuntimeGuard::new(self.runtime.clone()); // Then, poll any tasks that might be pending in the scope @@ -660,7 +655,7 @@ impl VirtualDom { self.queue_events(); // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures - if self.dirty_scopes.has_dirty_scopes() { + if self.has_dirty_scopes() { break; } @@ -669,12 +664,7 @@ impl VirtualDom { let _runtime = RuntimeGuard::new(self.runtime.clone()); // Next, run any queued tasks // We choose not to poll the deadline since we complete pretty quickly anyways - while let Some(task) = self.dirty_scopes.pop_task() { - // If the scope doesn't exist for whatever reason, then we should skip it - if !self.scopes.contains(task.order.id.0) { - continue; - } - + while let Some(task) = self.pop_task() { // Then poll any tasks that might be pending let tasks = task.tasks_queued.into_inner(); for task in tasks { @@ -682,7 +672,7 @@ impl VirtualDom { let _ = self.runtime.handle_task_wakeup(task); // Running that task, may mark a scope higher up as dirty. If it does, return from the function early self.queue_events(); - if self.dirty_scopes.has_dirty_scopes() { + if self.has_dirty_scopes() { break 'wait_for_work; } } @@ -694,27 +684,20 @@ impl VirtualDom { } // Render whatever work needs to be rendered, unlocking new futures and suspense leaves - while let Some(work) = self.dirty_scopes.pop_work() { - // If the scope doesn't exist for whatever reason, then we should skip it - if !self.scopes.contains(work.scope.id.0) { - continue; + let _runtime = RuntimeGuard::new(self.runtime.clone()); + while let Some(work) = self.pop_work() { + // Then, poll any tasks that might be pending in the scope + for task in work.tasks { + // During suspense, we only want to run tasks that are suspended + if self.runtime.suspended_tasks.borrow().contains(&task) { + let _ = self.runtime.handle_task_wakeup(task); + } } + // If the scope is dirty, run the scope and get the mutations + if work.rerun_scope { + let new_nodes = self.run_scope(work.scope.id); - { - let _runtime = RuntimeGuard::new(self.runtime.clone()); - // Then, poll any tasks that might be pending in the scope - for task in work.tasks { - // During suspense, we only want to run tasks that are suspended - if self.runtime.suspended_tasks.borrow().contains(&task) { - let _ = self.runtime.handle_task_wakeup(task); - } - } - // If the scope is dirty, run the scope and get the mutations - if work.rerun_scope { - let new_nodes = self.run_scope(work.scope.id); - - self.diff_scope(&mut NoOpMutations, work.scope.id, new_nodes); - } + self.diff_scope(&mut NoOpMutations, work.scope.id, new_nodes); } } } diff --git a/packages/fullstack/examples/static-hydrated/src/main.rs b/packages/fullstack/examples/static-hydrated/src/main.rs index e9d2c77c7..3fb981cee 100644 --- a/packages/fullstack/examples/static-hydrated/src/main.rs +++ b/packages/fullstack/examples/static-hydrated/src/main.rs @@ -36,7 +36,7 @@ async fn main() { } // Hydrate the page -#[cfg(not(feature = "server"))] +#[cfg(all(feature = "web", not(feature = "server")))] fn main() { dioxus_web::launch_with_props( dioxus_fullstack::router::RouteWithCfg::,