Fix effect ordering after reruns

This commit is contained in:
Evan Almloff 2024-03-01 12:46:07 -06:00
parent 26f5fb80cb
commit a14e5be7ad
8 changed files with 89 additions and 51 deletions

View file

@ -16,6 +16,11 @@ fn app() -> Element {
let len = vec.len();
println!("app len: {}", len);
use_effect(move || {
println!("app effect len: {}", vec.len());
});
rsx! {
button {
onclick: move |_| {

View file

@ -260,10 +260,7 @@ pub fn after_render(f: impl FnMut() + 'static) {
pub async fn flush_sync() {
// Wait for the flush lock to be available
// We release it immediately, so it's impossible for the lock to be held longer than this function
Runtime::with(|rt| rt.flush_mutex.clone())
.unwrap()
.lock()
.await;
Runtime::with(|rt| rt.render_signal.subscribe()).unwrap().await;
}
/// Use a hook with a cleanup function

View file

@ -20,6 +20,7 @@ mod scope_context;
mod scopes;
mod tasks;
mod virtual_dom;
mod render_signal;
pub(crate) mod innerlude {
pub(crate) use crate::any_props::*;

View file

@ -0,0 +1,63 @@
use std::rc::Rc;
use std::task::Waker;
use std::task::Poll;
use std::pin::Pin;
use std::future::Future;
use std::task::Context;
use std::cell::RefCell;
/// A signal is a message that can be sent to all listening tasks at once
#[derive(Default)]
pub struct RenderSignal {
wakers: Rc<RefCell<Vec<Rc<RefCell<RenderSignalFutureInner>>>>>,
}
impl RenderSignal {
/// Send the signal to all listening tasks
pub fn send(&self) {
let mut wakers = self.wakers.borrow_mut();
for waker in wakers.drain(..) {
let mut inner = waker.borrow_mut();
inner.resolved = true;
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
}
/// Create a future that resolves when the signal is sent
pub fn subscribe(& self) -> RenderSignalFuture {
let inner =Rc::new(RefCell::new(RenderSignalFutureInner {
resolved: false,
waker: None,
}));
self.wakers.borrow_mut().push(inner.clone());
let waker = RenderSignalFuture {
inner
};
waker
}
}
struct RenderSignalFutureInner {
resolved: bool,
waker: Option<Waker>,
}
pub(crate) struct RenderSignalFuture {
inner: Rc<RefCell<RenderSignalFutureInner>>,
}
impl Future for RenderSignalFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut inner = self.inner.borrow_mut();
if inner.resolved {
Poll::Ready(())
} else {
inner.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}

View file

@ -3,11 +3,11 @@ use crate::{
scope_context::Scope,
scopes::ScopeId,
Task,
render_signal::RenderSignal
};
use std::{
cell::{Cell, Ref, RefCell},
rc::Rc,
sync::Arc,
};
thread_local! {
@ -31,18 +31,15 @@ pub struct Runtime {
pub(crate) sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
// the virtualdom will hold this lock while it's doing syncronous work
// when the lock is lifted, tasks waiting for the lock will be able to run
pub(crate) flush_mutex: Arc<futures_util::lock::Mutex<()>>,
pub(crate) flush_lock: Cell<Option<futures_util::lock::OwnedMutexGuard<()>>>,
// Synchronous tasks need to be run after the next render. The virtual dom stores a list of those tasks to send a signal to them when the next render is done.
pub(crate) render_signal: RenderSignal,
}
impl Runtime {
pub(crate) fn new(sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>) -> Rc<Self> {
Rc::new(Self {
sender,
flush_mutex: Default::default(),
flush_lock: Default::default(),
render_signal: RenderSignal::default(),
rendering: Cell::new(true),
scope_states: Default::default(),
scope_stack: Default::default(),
@ -149,25 +146,6 @@ impl Runtime {
pub(crate) fn with_scope<R>(scope: ScopeId, f: impl FnOnce(&Scope) -> R) -> Option<R> {
Self::with(|rt| rt.get_state(scope).map(|sc| f(&sc))).flatten()
}
/// Acquire the flush lock and store it interally
///
/// This means the virtual dom is currently doing syncronous work
/// The lock will be held until `release_flush_lock` is called - and then the OwnedLock will be dropped
pub(crate) fn acquire_flush_lock(&self) {
// The flush lock might already be held...
if let Some(lock) = self.flush_mutex.try_lock_owned() {
self.flush_lock.set(Some(lock));
}
}
/// Release the flush lock
///
/// On the drop of the flush lock, all tasks waiting on `flush_sync` will spring to life via their wakers.
/// You can now freely poll those tasks and they can progress
pub(crate) fn release_flush_lock(&self) {
self.flush_lock.take();
}
}
/// A guard for a new runtime. This must be used to override the current runtime when importing components from a dynamic library that has it's own runtime.

View file

@ -462,10 +462,6 @@ impl VirtualDom {
///
async fn poll_tasks(&mut self) {
// Release the flush lock
// This will cause all the flush wakers to immediately spring to life, which we will off with process_events
self.runtime.release_flush_lock();
loop {
// Process all events - Scopes are marked dirty, etc
// Sometimes when wakers fire we get a slew of updates at once, so its important that we drain this completely
@ -479,13 +475,6 @@ impl VirtualDom {
// Make sure we set the runtime since we're running user code
let _runtime = RuntimeGuard::new(self.runtime.clone());
// Hold a lock to the flush sync to prevent tasks from running in the event we get an immediate
// When we're doing awaiting the rx, the lock will be dropped and tasks waiting on the lock will get waked
// We have to own the lock since poll_tasks is cancel safe - the future that this is running in might get dropped
// and if we held the lock in the scope, the lock would also get dropped prematurely
self.runtime.release_flush_lock();
self.runtime.acquire_flush_lock();
match self.rx.next().await.expect("channel should never close") {
SchedulerMsg::Immediate(id) => self.mark_dirty(id),
SchedulerMsg::TaskNotified(id) => {
@ -498,10 +487,8 @@ impl VirtualDom {
}
}
/// Process all events in the queue until there are no more left
pub fn process_events(&mut self) {
let _runtime = RuntimeGuard::new(self.runtime.clone());
/// Queue any pending events
fn queue_events(&mut self) {
// Prevent a task from deadlocking the runtime by repeatedly queueing itself
while let Ok(Some(msg)) = self.rx.try_next() {
match msg {
@ -509,6 +496,12 @@ impl VirtualDom {
SchedulerMsg::TaskNotified(task) => self.mark_task_dirty(task),
}
}
}
/// Process all events in the queue until there are no more left
pub fn process_events(&mut self) {
let _runtime = RuntimeGuard::new(self.runtime.clone());
self.queue_events();
// Now that we have collected all queued work, we should check if we have any dirty scopes. If there are not, then we can poll any queued futures
if self.scopes_need_rerun {
@ -527,7 +520,7 @@ impl VirtualDom {
for task in dirty.tasks_queued.borrow().iter() {
let _ = self.runtime.handle_task_wakeup(*task);
// Running that task, may mark a scope higher up as dirty. If it does, return from the function early
self.process_events();
self.queue_events();
if self.scopes_need_rerun {
return;
}
@ -639,6 +632,7 @@ impl VirtualDom {
}
self.scopes_need_rerun = false;
self.runtime.render_signal.send();
}
/// [`Self::render_immediate`] to a vector of mutations for testing purposes

View file

@ -125,7 +125,7 @@ async fn flushing() {
let fut = async {
// Trigger the flush by waiting for work
for _ in 0..10 {
for _ in 0..30 {
dom.mark_dirty(ScopeId(0));
BROADCAST.with(|b| b.0.send(()).unwrap());
dom.wait_for_work().await;

View file

@ -16,14 +16,14 @@ pub fn use_effect(mut callback: impl FnMut() + 'static) {
spawn(async move {
let rc = ReactiveContext::new_with_origin(location);
loop {
// Wait for the dom the be finished with sync work
// flush_sync().await;
// Run the effect
rc.run_in(&mut callback);
// Wait for context to change
rc.changed().await;
// Wait for the dom the be finished with sync work
flush_sync().await;
}
});
});