Queue effects to run after flush_sync

This commit is contained in:
Jonathan Kelley 2024-01-28 02:21:05 -08:00
parent 7e4a1e9c7c
commit 1847c737e9
No known key found for this signature in database
GPG key ID: 1FBB50F7EB0A08BE
8 changed files with 137 additions and 59 deletions

View file

@ -6,7 +6,7 @@ fn main() {
fn app() -> Element {
let mut state = use_signal(|| 0);
let mut depth = use_signal(|| 1 as usize);
let mut depth = use_signal(|| 0 as usize);
let mut items = use_memo(move || (0..depth()).map(|f| f as _).collect::<Vec<isize>>());
let a = use_memo(move || state() + 1);
@ -35,14 +35,14 @@ fn Child(
return None;
}
println!("rendering child: {}", depth());
// These memos don't get re-computed when early returns happen
// In dioxus futures spawned with use_future won't progress if they don't get hit during rendering
let state = use_memo(move || state() + 1);
let item = use_memo(move || items()[dbg!(depth()) - 1]);
let item = use_memo(move || items()[depth()]);
let depth = use_memo(move || depth() - 1);
println!("rendering child: {}", depth());
rsx! {
h3 { "Depth({depth})-Item({item}): {state}"}
Child {

View file

@ -71,7 +71,7 @@ pub fn spawn_forever(fut: impl Future<Output = ()> + 'static) -> Option<Task> {
///
/// This drops the task immediately.
pub fn remove_future(id: Task) {
Runtime::with_current_scope(|cx| cx.remove_future(id));
Runtime::with(|rt| rt.remove_task(id)).expect("Runtime to exist");
}
/// Store a value between renders. The foundational hook for all other hooks.
@ -94,7 +94,6 @@ pub fn use_hook<State: Clone + 'static>(initializer: impl FnOnce() -> State) ->
Runtime::with_current_scope(|cx| cx.use_hook(initializer)).expect("to be in a dioxus runtime")
}
/// Get the current render since the inception of this component
///
/// This can be used as a helpful diagnostic when debugging hooks/renders, etc
@ -217,7 +216,6 @@ pub fn use_drop<D: FnOnce() + 'static>(destroy: D) {
});
}
/// Push a function to be run before the next render
/// This is a hook and will always run, so you can't unschedule it
/// Will run for every progression of suspense, though this might change in the future

View file

@ -234,13 +234,6 @@ impl Scope {
Runtime::with(|rt| rt.spawn(self.id, fut)).expect("Runtime to exist")
}
/// Informs the scheduler that this task is no longer needed and should be removed.
///
/// This drops the task immediately.
pub fn remove_future(&self, id: Task) {
Runtime::with(|rt| rt.remove_task(id)).expect("Runtime to exist");
}
/// Mark this component as suspended and then return None
pub fn suspend(&self) -> Option<Element> {
self.suspended.set(true);

View file

@ -45,12 +45,19 @@ impl Task {
Runtime::with(|rt| !rt.tasks.borrow()[self.0].active.get()).unwrap_or_default()
}
pub fn wake(&self) {
Runtime::with(|rt| _ = rt.sender.unbounded_send(SchedulerMsg::TaskNotified(*self)));
}
pub fn set_active(&self, active: bool) {
Runtime::with(|rt| rt.tasks.borrow()[self.0].active.set(active));
}
/// Resume the task.
pub fn resume(&self) {
Runtime::with(|rt| {
// set the active flag, and then ping the scheduler to ensure the task gets queued
rt.tasks.borrow()[self.0].active.set(true);
_ = rt.sender.unbounded_send(SchedulerMsg::TaskNotified(*self));
let was_active = rt.tasks.borrow()[self.0].active.replace(true);
});
}
}

View file

@ -55,6 +55,9 @@ macro_rules! to_owned {
};
}
mod use_callback;
pub use use_callback::*;
mod use_on_destroy;
pub use use_on_destroy::*;

View file

@ -0,0 +1,48 @@
use dioxus_core::prelude::use_hook;
use dioxus_signals::CopyValue;
use dioxus_signals::Writable;
/// A callback that's always current
///
/// Whenever this hook is called the inner callback will be replaced with the new callback but the handle will remain.
///
/// There is *currently* no signal tracking on the Callback so anything reading from it will not be updated.
///
/// This API is in flux and might not remain.
pub fn use_callback<O>(f: impl FnMut() -> O + 'static) -> UseCallback<O> {
// Create a copyvalue with no contents
// This copyvalue is generic over F so that it can be sized properly
let mut inner = use_hook(|| CopyValue::new(None));
// Every time this hook is called replace the inner callback with the new callback
inner.set(Some(f));
// And then wrap that callback in a boxed callback so we're blind to the size of the actual callback
use_hook(|| UseCallback {
inner: CopyValue::new(Box::new(move || {
inner.with_mut(|f: &mut Option<_>| f.as_mut().unwrap()())
})),
})
}
/// This callback is not generic over a return type so you can hold a bunch of callbacks at once
///
/// If you need a callback that returns a value, you can simply wrap the closure you pass in that sets a value in its scope
#[derive(PartialEq)]
pub struct UseCallback<O: 'static + ?Sized> {
inner: CopyValue<Box<dyn FnMut() -> O>>,
}
impl<O: 'static + ?Sized> Clone for UseCallback<O> {
fn clone(&self) -> Self {
Self { inner: self.inner }
}
}
// impl<O: 'static> Copy for UseCallback<O> {}
impl<O> UseCallback<O> {
/// Call the callback
pub fn call(&mut self) -> O {
self.inner.with_mut(|f| f())
}
}

View file

@ -1,62 +1,67 @@
#![allow(missing_docs)]
use dioxus_core::{
prelude::{spawn, use_drop, use_hook},
prelude::{spawn, use_before_render, use_drop, use_hook},
ScopeState, Task,
};
use dioxus_signals::*;
use dioxus_signals::{Readable, Writable};
use futures_util::{future, pin_mut, FutureExt};
use std::{any::Any, cell::Cell, future::Future, pin::Pin, rc::Rc, sync::Arc, task::Poll};
use crate::use_callback;
/// A hook that allows you to spawn a future
///
/// Does not regenerate the future when dependencies change. If you're looking for a future that does, check out
/// `use_resource` instead.
/// Does not regenerate the future when dependencies change.
pub fn use_future<F>(mut future: impl FnMut() -> F) -> UseFuture
where
F: Future + 'static,
{
let mut state = use_signal(|| UseFutureState::Pending);
let state = use_signal(|| UseFutureState::Pending);
let task = use_signal(|| {
// Create the user's task
// Create the task inside a copyvalue so we can reset it in-place later
let task = use_hook(|| {
let fut = future();
// Spawn a wrapper task that polls the innner future and watch its dependencies
let task = spawn(async move {
// move the future here and pin it so we can poll it
let fut = fut;
pin_mut!(fut);
let res = future::poll_fn(|cx| {
// Set the effect stack properly
// Poll the inner future
let ready = fut.poll_unpin(cx);
// add any dependencies to the effect stack that we need to watch when restarting the future
ready
})
.await;
// Set the value
// value.set(Some(res));
CopyValue::new(spawn(async move {
fut.await;
}))
});
Some(task)
/*
Early returns in dioxus have consequences for use_memo, use_resource, and use_future, etc
We *don't* want futures to be running if the component early returns. It's a rather weird behavior to have
use_memo running in the background even if the component isn't hitting those hooks anymore.
React solves this by simply not having early returns interleave with hooks.
However, since dioxus allows early returns (since we use them for suspense), we need to solve this problem.
*/
// Track if this *current* render is the same
let gen = use_hook(|| CopyValue::new((0, 0)));
// Early returns will pause this task, effectively
use_before_render(move || {
gen.write().0 += 1;
task.peek().set_active(false);
});
use_drop(move || {
if let Some(task) = task.take() {
task.stop();
// However when we actually run this component, we want to resume the task
task.peek().set_active(true);
gen.write().1 += 1;
// if the gens are different, we need to wake the task
if gen().0 != gen().1 {
task.peek().wake();
}
});
use_drop(move || task.peek().stop());
UseFuture { task, state }
}
pub struct UseFuture {
task: Signal<Option<Task>>,
task: CopyValue<Task>,
state: Signal<UseFutureState>,
}

View file

@ -2,7 +2,7 @@ use crate::write::*;
use core::{self, fmt::Debug};
use dioxus_core::prelude::*;
use futures_channel::mpsc::UnboundedSender;
use futures_util::StreamExt;
use futures_util::{future::Either, pin_mut, StreamExt};
use generational_box::GenerationalBoxId;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
@ -52,8 +52,31 @@ pub(crate) fn get_effect_ref() -> EffectStackRef {
None => {
let (sender, mut receiver) = futures_channel::mpsc::unbounded();
spawn_forever(async move {
while let Some(id) = receiver.next().await {
let mut queued_memos = Vec::new();
loop {
// Wait for a flush
// This gives a chance for effects to be updated in place and memos to compute their values
let flush_await = flush_sync();
pin_mut!(flush_await);
loop {
let res =
futures_util::future::select(&mut flush_await, receiver.next()).await;
match res {
Either::Right((_queued, _)) => {
if let Some(task) = _queued {
queued_memos.push(task);
}
continue;
}
Either::Left(_flushed) => break,
}
}
EFFECT_STACK.with(|stack| {
for id in queued_memos.drain(..) {
let effect_mapping = stack.effect_mapping.read();
if let Some(mut effect) = effect_mapping.get(&id).copied() {
tracing::trace!("Rerunning effect: {:?}", id);
@ -61,6 +84,7 @@ pub(crate) fn get_effect_ref() -> EffectStackRef {
} else {
tracing::trace!("Effect not found: {:?}", id);
}
}
});
}
});