From a14e5be7ad1369597706d31d36b64f8d354af165 Mon Sep 17 00:00:00 2001 From: Evan Almloff Date: Fri, 1 Mar 2024 12:46:07 -0600 Subject: [PATCH] Fix effect ordering after reruns --- examples/readme.rs | 5 +++ packages/core/src/global_context.rs | 5 +-- packages/core/src/lib.rs | 1 + packages/core/src/render_signal.rs | 63 +++++++++++++++++++++++++++++ packages/core/src/runtime.rs | 30 ++------------ packages/core/src/virtual_dom.rs | 26 +++++------- packages/core/tests/task.rs | 2 +- packages/hooks/src/use_effect.rs | 8 ++-- 8 files changed, 89 insertions(+), 51 deletions(-) create mode 100644 packages/core/src/render_signal.rs diff --git a/examples/readme.rs b/examples/readme.rs index be4128a99..bf7c31fa3 100644 --- a/examples/readme.rs +++ b/examples/readme.rs @@ -16,6 +16,11 @@ fn app() -> Element { let len = vec.len(); + println!("app len: {}", len); + use_effect(move || { + println!("app effect len: {}", vec.len()); + }); + rsx! { button { onclick: move |_| { diff --git a/packages/core/src/global_context.rs b/packages/core/src/global_context.rs index 453cba33e..995abd58a 100644 --- a/packages/core/src/global_context.rs +++ b/packages/core/src/global_context.rs @@ -260,10 +260,7 @@ pub fn after_render(f: impl FnMut() + 'static) { pub async fn flush_sync() { // Wait for the flush lock to be available // We release it immediately, so it's impossible for the lock to be held longer than this function - Runtime::with(|rt| rt.flush_mutex.clone()) - .unwrap() - .lock() - .await; + Runtime::with(|rt| rt.render_signal.subscribe()).unwrap().await; } /// Use a hook with a cleanup function diff --git a/packages/core/src/lib.rs b/packages/core/src/lib.rs index afa80eb84..957ba543a 100644 --- a/packages/core/src/lib.rs +++ b/packages/core/src/lib.rs @@ -20,6 +20,7 @@ mod scope_context; mod scopes; mod tasks; mod virtual_dom; +mod render_signal; pub(crate) mod innerlude { pub(crate) use crate::any_props::*; diff --git a/packages/core/src/render_signal.rs b/packages/core/src/render_signal.rs new file mode 100644 index 000000000..7f4421337 --- /dev/null +++ b/packages/core/src/render_signal.rs @@ -0,0 +1,63 @@ +use std::rc::Rc; +use std::task::Waker; +use std::task::Poll; +use std::pin::Pin; +use std::future::Future; +use std::task::Context; +use std::cell::RefCell; + +/// A signal is a message that can be sent to all listening tasks at once +#[derive(Default)] +pub struct RenderSignal { + wakers: Rc>>>>, +} + +impl RenderSignal { + /// Send the signal to all listening tasks + pub fn send(&self) { + let mut wakers = self.wakers.borrow_mut(); + for waker in wakers.drain(..) { + let mut inner = waker.borrow_mut(); + inner.resolved = true; + if let Some(waker) = inner.waker.take() { + waker.wake(); + } + } + } + + /// Create a future that resolves when the signal is sent + pub fn subscribe(& self) -> RenderSignalFuture { + let inner =Rc::new(RefCell::new(RenderSignalFutureInner { + resolved: false, + waker: None, + })); + self.wakers.borrow_mut().push(inner.clone()); + let waker = RenderSignalFuture { + inner + }; + waker + } +} + +struct RenderSignalFutureInner { + resolved: bool, + waker: Option, +} + +pub(crate) struct RenderSignalFuture { + inner: Rc>, +} + +impl Future for RenderSignalFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let mut inner = self.inner.borrow_mut(); + if inner.resolved { + Poll::Ready(()) + } else { + inner.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} diff --git a/packages/core/src/runtime.rs b/packages/core/src/runtime.rs index 8493a901f..fc1ccd9d7 100644 --- a/packages/core/src/runtime.rs +++ b/packages/core/src/runtime.rs @@ -3,11 +3,11 @@ use crate::{ scope_context::Scope, scopes::ScopeId, Task, + render_signal::RenderSignal }; use std::{ cell::{Cell, Ref, RefCell}, rc::Rc, - sync::Arc, }; thread_local! { @@ -31,18 +31,15 @@ pub struct Runtime { pub(crate) sender: futures_channel::mpsc::UnboundedSender, - // the virtualdom will hold this lock while it's doing syncronous work - // when the lock is lifted, tasks waiting for the lock will be able to run - pub(crate) flush_mutex: Arc>, - pub(crate) flush_lock: Cell>>, + // Synchronous tasks need to be run after the next render. The virtual dom stores a list of those tasks to send a signal to them when the next render is done. + pub(crate) render_signal: RenderSignal, } impl Runtime { pub(crate) fn new(sender: futures_channel::mpsc::UnboundedSender) -> Rc { Rc::new(Self { sender, - flush_mutex: Default::default(), - flush_lock: Default::default(), + render_signal: RenderSignal::default(), rendering: Cell::new(true), scope_states: Default::default(), scope_stack: Default::default(), @@ -149,25 +146,6 @@ impl Runtime { pub(crate) fn with_scope(scope: ScopeId, f: impl FnOnce(&Scope) -> R) -> Option { Self::with(|rt| rt.get_state(scope).map(|sc| f(&sc))).flatten() } - - /// Acquire the flush lock and store it interally - /// - /// This means the virtual dom is currently doing syncronous work - /// The lock will be held until `release_flush_lock` is called - and then the OwnedLock will be dropped - pub(crate) fn acquire_flush_lock(&self) { - // The flush lock might already be held... - if let Some(lock) = self.flush_mutex.try_lock_owned() { - self.flush_lock.set(Some(lock)); - } - } - - /// Release the flush lock - /// - /// On the drop of the flush lock, all tasks waiting on `flush_sync` will spring to life via their wakers. - /// You can now freely poll those tasks and they can progress - pub(crate) fn release_flush_lock(&self) { - self.flush_lock.take(); - } } /// A guard for a new runtime. This must be used to override the current runtime when importing components from a dynamic library that has it's own runtime. diff --git a/packages/core/src/virtual_dom.rs b/packages/core/src/virtual_dom.rs index 88a404412..d891faaa4 100644 --- a/packages/core/src/virtual_dom.rs +++ b/packages/core/src/virtual_dom.rs @@ -462,10 +462,6 @@ impl VirtualDom { /// async fn poll_tasks(&mut self) { - // Release the flush lock - // This will cause all the flush wakers to immediately spring to life, which we will off with process_events - self.runtime.release_flush_lock(); - loop { // Process all events - Scopes are marked dirty, etc // Sometimes when wakers fire we get a slew of updates at once, so its important that we drain this completely @@ -479,13 +475,6 @@ impl VirtualDom { // Make sure we set the runtime since we're running user code let _runtime = RuntimeGuard::new(self.runtime.clone()); - // Hold a lock to the flush sync to prevent tasks from running in the event we get an immediate - // When we're doing awaiting the rx, the lock will be dropped and tasks waiting on the lock will get waked - // We have to own the lock since poll_tasks is cancel safe - the future that this is running in might get dropped - // and if we held the lock in the scope, the lock would also get dropped prematurely - self.runtime.release_flush_lock(); - self.runtime.acquire_flush_lock(); - match self.rx.next().await.expect("channel should never close") { SchedulerMsg::Immediate(id) => self.mark_dirty(id), SchedulerMsg::TaskNotified(id) => { @@ -498,10 +487,8 @@ impl VirtualDom { } } - /// Process all events in the queue until there are no more left - pub fn process_events(&mut self) { - let _runtime = RuntimeGuard::new(self.runtime.clone()); - + /// Queue any pending events + fn queue_events(&mut self) { // Prevent a task from deadlocking the runtime by repeatedly queueing itself while let Ok(Some(msg)) = self.rx.try_next() { match msg { @@ -509,6 +496,12 @@ impl VirtualDom { SchedulerMsg::TaskNotified(task) => self.mark_task_dirty(task), } } + } + + /// Process all events in the queue until there are no more left + pub fn process_events(&mut self) { + let _runtime = RuntimeGuard::new(self.runtime.clone()); + self.queue_events(); // Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures if self.scopes_need_rerun { @@ -527,7 +520,7 @@ impl VirtualDom { for task in dirty.tasks_queued.borrow().iter() { let _ = self.runtime.handle_task_wakeup(*task); // Running that task, may mark a scope higher up as dirty. If it does, return from the function early - self.process_events(); + self.queue_events(); if self.scopes_need_rerun { return; } @@ -639,6 +632,7 @@ impl VirtualDom { } self.scopes_need_rerun = false; + self.runtime.render_signal.send(); } /// [`Self::render_immediate`] to a vector of mutations for testing purposes diff --git a/packages/core/tests/task.rs b/packages/core/tests/task.rs index c3d282547..f1192685f 100644 --- a/packages/core/tests/task.rs +++ b/packages/core/tests/task.rs @@ -125,7 +125,7 @@ async fn flushing() { let fut = async { // Trigger the flush by waiting for work - for _ in 0..10 { + for _ in 0..30 { dom.mark_dirty(ScopeId(0)); BROADCAST.with(|b| b.0.send(()).unwrap()); dom.wait_for_work().await; diff --git a/packages/hooks/src/use_effect.rs b/packages/hooks/src/use_effect.rs index 7811d5171..52e39dead 100644 --- a/packages/hooks/src/use_effect.rs +++ b/packages/hooks/src/use_effect.rs @@ -16,14 +16,14 @@ pub fn use_effect(mut callback: impl FnMut() + 'static) { spawn(async move { let rc = ReactiveContext::new_with_origin(location); loop { - // Wait for the dom the be finished with sync work - // flush_sync().await; - // Run the effect rc.run_in(&mut callback); - + // Wait for context to change rc.changed().await; + + // Wait for the dom the be finished with sync work + flush_sync().await; } }); });