Merge pull request #2029 from ealmloff/isomorphic-spawn

Fix memos in children; introduce isomorphic spawn
This commit is contained in:
Jonathan Kelley 2024-03-12 13:32:22 -07:00 committed by GitHub
commit b35f74e9d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 273 additions and 79 deletions

1
Cargo.lock generated
View file

@ -2347,6 +2347,7 @@ dependencies = [
"longest-increasing-subsequence",
"pretty_assertions",
"rand 0.8.5",
"reqwest",
"rustc-hash",
"serde",
"slab",

View file

@ -29,6 +29,7 @@ dioxus = { workspace = true }
pretty_assertions = "1.3.0"
rand = "0.8.5"
dioxus-ssr = { workspace = true }
reqwest.workspace = true
[features]
default = []

View file

@ -56,6 +56,35 @@ pub fn suspend(task: Task) -> Element {
None
}
/// Start a new future on the same thread as the rest of the VirtualDom.
///
/// **You should generally use `spawn` instead of this method unless you specifically need to need to run a task during suspense**
///
/// This future will not contribute to suspense resolving but it will run during suspense.
///
/// Because this future runs during suspense, you need to be careful to work with hydration. It is not recommended to do any async IO work in this future, as it can easily cause hydration issues. However, you can use isomorphic tasks to do work that can be consistently replicated on the server and client like logging or responding to state changes.
///
/// ```rust, no_run
/// # use dioxus::prelude::*;
/// // ❌ Do not do requests in isomorphic tasks. It may resolve at a different time on the server and client, causing hydration issues.
/// let mut state = use_signal(|| None);
/// spawn_isomorphic(async move {
/// state.set(Some(reqwest::get("https://api.example.com").await));
/// });
///
/// // ✅ You may wait for a signal to change and then log it
/// let mut state = use_signal(|| 0);
/// spawn_isomorphic(async move {
/// loop {
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// println!("State is {state}");
/// }
/// });
/// ```
pub fn spawn_isomorphic(fut: impl Future<Output = ()> + 'static) -> Task {
Runtime::with_current_scope(|cx| cx.spawn_isomorphic(fut)).expect("to be in a dioxus runtime")
}
/// Spawns the future but does not return the [`TaskId`]
pub fn spawn(fut: impl Future<Output = ()> + 'static) -> Task {
Runtime::with_current_scope(|cx| cx.spawn(fut)).expect("to be in a dioxus runtime")

View file

@ -91,11 +91,12 @@ pub mod prelude {
consume_context, consume_context_from_scope, current_scope_id, fc_to_builder, generation,
has_context, needs_update, needs_update_any, parent_scope, provide_context,
provide_root_context, remove_future, schedule_update, schedule_update_any, spawn,
spawn_forever, suspend, try_consume_context, use_after_render, use_before_render, use_drop,
use_error_boundary, use_hook, use_hook_with_cleanup, wait_for_next_render, AnyValue,
Attribute, Component, ComponentFunction, Element, ErrorBoundary, Event, EventHandler,
Fragment, HasAttributes, IntoAttributeValue, IntoDynNode, OptionStringFromMarker,
Properties, Runtime, RuntimeGuard, ScopeId, ScopeState, SuperFrom, SuperInto, Task,
Template, TemplateAttribute, TemplateNode, Throw, VNode, VNodeInner, VirtualDom,
spawn_forever, spawn_isomorphic, suspend, try_consume_context, use_after_render,
use_before_render, use_drop, use_error_boundary, use_hook, use_hook_with_cleanup,
wait_for_next_render, AnyValue, Attribute, Component, ComponentFunction, Element,
ErrorBoundary, Event, EventHandler, Fragment, HasAttributes, IntoAttributeValue,
IntoDynNode, OptionStringFromMarker, Properties, Runtime, RuntimeGuard, ScopeId,
ScopeState, SuperFrom, SuperInto, Task, Template, TemplateAttribute, TemplateNode, Throw,
VNode, VNodeInner, VirtualDom,
};
}

View file

@ -1,5 +1,3 @@
use rustc_hash::FxHashSet;
use crate::{
innerlude::{LocalTask, SchedulerMsg},
render_signal::RenderSignal,
@ -30,7 +28,7 @@ pub struct Runtime {
pub(crate) tasks: RefCell<slab::Slab<Rc<LocalTask>>>,
// Currently suspended tasks
pub(crate) suspended_tasks: RefCell<FxHashSet<Task>>,
pub(crate) suspended_tasks: Cell<usize>,
pub(crate) rendering: Cell<bool>,

View file

@ -72,7 +72,10 @@ impl VirtualDom {
if let Some(task) = context.last_suspendable_task.take() {
if matches!(new_nodes, RenderReturn::Aborted(_)) {
tracing::trace!("Suspending {:?} on {:?}", scope_id, task);
self.runtime.suspended_tasks.borrow_mut().insert(task);
self.runtime.tasks.borrow().get(task.0).unwrap().suspend();
self.runtime
.suspended_tasks
.set(self.runtime.suspended_tasks.get() + 1);
}
}

View file

@ -227,6 +227,37 @@ impl Scope {
.expect("Runtime to exist")
}
/// Start a new future on the same thread as the rest of the VirtualDom.
///
/// **You should generally use `spawn` instead of this method unless you specifically need to need to run a task during suspense**
///
/// This future will not contribute to suspense resolving but it will run during suspense.
///
/// Because this future runs during suspense, you need to be careful to work with hydration. It is not recommended to do any async IO work in this future, as it can easily cause hydration issues. However, you can use isomorphic tasks to do work that can be consistently replicated on the server and client like logging or responding to state changes.
///
/// ```rust, no_run
/// # use dioxus::prelude::*;
/// // ❌ Do not do requests in isomorphic tasks. It may resolve at a different time on the server and client, causing hydration issues.
/// let mut state = use_signal(|| None);
/// spawn_isomorphic(async move {
/// state.set(Some(reqwest::get("https://api.example.com").await));
/// });
///
/// // ✅ You may wait for a signal to change and then log it
/// let mut state = use_signal(|| 0);
/// spawn_isomorphic(async move {
/// loop {
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// println!("State is {state}");
/// }
/// });
/// ```
pub fn spawn_isomorphic(&self, fut: impl Future<Output = ()> + 'static) -> Task {
let id = Runtime::with(|rt| rt.spawn_isomorphic(self.id, fut)).expect("Runtime to exist");
self.spawned_tasks.borrow_mut().insert(id);
id
}
/// Spawns the future but does not return the [`TaskId`]
pub fn spawn(&self, fut: impl Future<Output = ()> + 'static) -> Task {
let id = Runtime::with(|rt| rt.spawn(self.id, fut)).expect("Runtime to exist");

View file

@ -81,6 +81,39 @@ impl Task {
}
impl Runtime {
/// Start a new future on the same thread as the rest of the VirtualDom.
///
/// **You should generally use `spawn` instead of this method unless you specifically need to need to run a task during suspense**
///
/// This future will not contribute to suspense resolving but it will run during suspense.
///
/// Because this future runs during suspense, you need to be careful to work with hydration. It is not recommended to do any async IO work in this future, as it can easily cause hydration issues. However, you can use isomorphic tasks to do work that can be consistently replicated on the server and client like logging or responding to state changes.
///
/// ```rust, no_run
/// # use dioxus::prelude::*;
/// // ❌ Do not do requests in isomorphic tasks. It may resolve at a different time on the server and client, causing hydration issues.
/// let mut state = use_signal(|| None);
/// spawn_isomorphic(async move {
/// state.set(Some(reqwest::get("https://api.example.com").await));
/// });
///
/// // ✅ You may wait for a signal to change and then log it
/// let mut state = use_signal(|| 0);
/// spawn_isomorphic(async move {
/// loop {
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// println!("State is {state}");
/// }
/// });
/// ```
pub fn spawn_isomorphic(
&self,
scope: ScopeId,
task: impl Future<Output = ()> + 'static,
) -> Task {
self.spawn_task_of_type(scope, task, TaskType::Isomorphic)
}
/// Start a new future on the same thread as the rest of the VirtualDom.
///
/// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
@ -91,6 +124,15 @@ impl Runtime {
/// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
/// will only occur when the VirtualDom itself has been dropped.
pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
self.spawn_task_of_type(scope, task, TaskType::ClientOnly)
}
fn spawn_task_of_type(
&self,
scope: ScopeId,
task: impl Future<Output = ()> + 'static,
ty: TaskType,
) -> Task {
// Insert the task, temporarily holding a borrow on the tasks map
let (task, task_id) = {
let mut tasks = self.tasks.borrow_mut();
@ -107,6 +149,7 @@ impl Runtime {
id: task_id,
tx: self.sender.clone(),
})),
ty: Cell::new(ty),
});
entry.insert(task.clone());
@ -186,8 +229,20 @@ impl Runtime {
///
/// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
self.suspended_tasks.borrow_mut().remove(&id);
self.tasks.borrow_mut().try_remove(id.0)
let task = self.tasks.borrow_mut().try_remove(id.0);
if let Some(task) = &task {
if task.suspended() {
self.suspended_tasks.set(self.suspended_tasks.get() - 1);
}
}
task
}
/// Check if a task should be run during suspense
pub(crate) fn task_runs_during_suspense(&self, task: Task) -> bool {
let borrow = self.tasks.borrow();
let task: Option<&LocalTask> = borrow.get(task.0).map(|t| &**t);
matches!(task, Some(LocalTask { ty, .. }) if ty.get().runs_during_suspense())
}
}
@ -197,9 +252,33 @@ pub(crate) struct LocalTask {
parent: Option<Task>,
task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
waker: Waker,
ty: Cell<TaskType>,
active: Cell<bool>,
}
impl LocalTask {
pub(crate) fn suspend(&self) {
self.ty.set(TaskType::Suspended);
}
pub(crate) fn suspended(&self) -> bool {
matches!(self.ty.get(), TaskType::Suspended)
}
}
#[derive(Clone, Copy)]
enum TaskType {
ClientOnly,
Suspended,
Isomorphic,
}
impl TaskType {
fn runs_during_suspense(self) -> bool {
matches!(self, TaskType::Isomorphic | TaskType::Suspended)
}
}
/// The type of message that can be sent to the scheduler.
///
/// These messages control how the scheduler will process updates to the UI.

View file

@ -468,6 +468,7 @@ impl VirtualDom {
}
/// Wait for the next event to trigger and add it to the queue
#[instrument(skip(self), level = "trace", name = "VirtualDom::wait_for_event")]
async fn wait_for_event(&mut self) {
match self.rx.next().await.expect("channel should never close") {
SchedulerMsg::Immediate(id) => self.mark_dirty(id),
@ -644,7 +645,7 @@ impl VirtualDom {
#[instrument(skip(self), level = "trace", name = "VirtualDom::wait_for_suspense")]
pub async fn wait_for_suspense(&mut self) {
loop {
if self.runtime.suspended_tasks.borrow().is_empty() {
if self.runtime.suspended_tasks.get() == 0 {
break;
}
@ -666,13 +667,17 @@ impl VirtualDom {
// We choose not to poll the deadline since we complete pretty quickly anyways
while let Some(task) = self.pop_task() {
// Then poll any tasks that might be pending
let tasks = task.tasks_queued.into_inner();
for task in tasks {
if self.runtime.suspended_tasks.borrow().contains(&task) {
let mut tasks = task.tasks_queued.into_inner();
while let Some(task) = tasks.pop() {
if self.runtime.task_runs_during_suspense(task) {
let _ = self.runtime.handle_task_wakeup(task);
// Running that task, may mark a scope higher up as dirty. If it does, return from the function early
self.queue_events();
if self.has_dirty_scopes() {
// requeue any remaining tasks
for task in tasks {
self.mark_task_dirty(task);
}
break 'wait_for_work;
}
}
@ -689,7 +694,7 @@ impl VirtualDom {
// Then, poll any tasks that might be pending in the scope
for task in work.tasks {
// During suspense, we only want to run tasks that are suspended
if self.runtime.suspended_tasks.borrow().contains(&task) {
if self.runtime.task_runs_during_suspense(task) {
let _ = self.runtime.handle_task_wakeup(task);
}
}

View file

@ -145,15 +145,7 @@ impl<T: Sync + Send + 'static> Storage<T> for SyncStorage {
#[cfg(any(debug_assertions, feature = "debug_ownership"))]
at: crate::GenerationalRefMutBorrowInfo,
) -> Result<Self::Mut<'static, T>, error::BorrowMutError> {
let write = self.0.try_write();
#[cfg(any(debug_assertions, feature = "debug_ownership"))]
let write = write.ok_or_else(|| at.borrowed_from.borrow_mut_error())?;
#[cfg(not(any(debug_assertions, feature = "debug_ownership")))]
let write = write.ok_or_else(|| {
error::BorrowMutError::AlreadyBorrowed(error::AlreadyBorrowedError {})
})?;
let write = self.0.write();
RwLockWriteGuard::try_map(write, |any| any.as_mut()?.downcast_mut())
.map_err(|_| {

View file

@ -15,6 +15,9 @@ pub fn try_use_context<T: 'static + Clone>() -> Option<T> {
///
/// Does not regenerate the value if the value is changed at the parent.
/// ```rust
/// # use dioxus::prelude::*;
/// # #[derive(Clone, Copy, PartialEq, Debug)]
/// # enum Theme { Dark, Light }
/// fn Parent() -> Element {
/// use_context_provider(|| Theme::Dark);
/// rsx! { Child {} }
@ -38,6 +41,7 @@ pub fn use_context<T: 'static + Clone>() -> T {
/// drilling, using a context provider with a Signal inside is a good way to provide global/shared
/// state in your app:
/// ```rust
/// # use dioxus::prelude::*;
///fn app() -> Element {
/// use_context_provider(|| Signal::new(0));
/// rsx! { Child {} }
@ -45,7 +49,7 @@ pub fn use_context<T: 'static + Clone>() -> T {
/// // This component does read from the signal, so when the signal changes it will rerun
///#[component]
///fn Child() -> Element {
/// let signal: Signal<i32> = use_context();
/// let mut signal: Signal<i32> = use_context();
/// rsx! {
/// button { onclick: move |_| signal += 1, "increment context" }
/// p {"{signal}"}

View file

@ -6,6 +6,7 @@ use futures_util::StreamExt;
/// effects will always run after first mount and then whenever the signal values change
/// If the use_effect call was skipped due to an early return, the effect will no longer activate.
/// ```rust
/// # use dioxus::prelude::*;
/// fn app() -> Element {
/// let mut count = use_signal(|| 0);
/// //the effect runs again each time count changes

View file

@ -13,6 +13,8 @@ use std::ops::Deref;
/// `use_future` **won't return a value**.
/// If you want to return a value from a future, use `use_resource` instead.
/// ```rust
/// # use dioxus::prelude::*;
/// # use std::time::Duration;
/// fn app() -> Element {
/// let mut count = use_signal(|| 0);
/// let mut running = use_signal(|| true);

View file

@ -15,16 +15,28 @@ use std::{cell::Cell, future::Future, rc::Rc};
/// Similar to `use_future` but `use_resource` returns a value.
/// See [`Resource`] for more details.
/// ```rust
///fn app() -> Element {
/// let country = use_signal(|| WeatherLocation {
/// city: "Berlin".to_string(),
/// country: "Germany".to_string(),
/// coordinates: (52.5244, 13.4105)
/// });
/// # use dioxus::prelude::*;
/// # #[derive(Clone)]
/// # struct WeatherLocation {
/// # city: String,
/// # country: String,
/// # coordinates: (f64, f64),
/// # }
/// # async fn get_weather(location: &WeatherLocation) -> Result<String, String> {
/// # Ok("Sunny".to_string())
/// # }
/// # #[component]
/// # fn WeatherElement (weather: String ) -> Element { rsx! { p { "The weather is {weather}" } } }
/// fn app() -> Element {
/// let country = use_signal(|| WeatherLocation {
/// city: "Berlin".to_string(),
/// country: "Germany".to_string(),
/// coordinates: (52.5244, 13.4105)
/// });
///
/// // Because the resource's future subscribes to `country` by reading it (`country.read()`),
/// // everytime `country` changes the resource's future will run again and thus provide a new value.
/// let current_weather = use_resource(move || async move { get_weather(&country.read().clone()).await });
/// let current_weather = use_resource(move || async move { get_weather(&country()).await });
///
/// rsx! {
/// // the value of the resource can be polled to
@ -32,9 +44,9 @@ use std::{cell::Cell, future::Future, rc::Rc};
/// // finished (Some(Ok(_)), errored Some(Err(_)),
/// // or is still running (None)
/// match current_weather.value() {
/// Some(Ok(weather)) => WeatherElement { weather },
/// Some(Err(e)) => p { "Loading weather failed, {e}" }
/// None => p { "Loading..." }
/// Some(Ok(weather)) => rsx! { WeatherElement { weather } },
/// Some(Err(e)) => rsx! { p { "Loading weather failed, {e}" } },
/// None => rsx! { p { "Loading..." } }
/// }
/// }
///}

View file

@ -0,0 +1,73 @@
#[tokio::test]
async fn memo_updates() {
use std::cell::RefCell;
use dioxus::prelude::*;
thread_local! {
static VEC_SIGNAL: RefCell<Option<Signal<Vec<usize>, SyncStorage>>> = RefCell::new(None);
}
fn app() -> Element {
let mut vec = use_signal_sync(|| vec![0, 1, 2]);
// Signals should update if they are changed from another thread
use_hook(|| {
VEC_SIGNAL.with(|cell| {
*cell.borrow_mut() = Some(vec);
});
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
vec.push(5);
});
});
let len = vec.len();
let len_memo = use_memo(move || vec.len());
// Make sure memos that update in the middle of a component work
if generation() < 2 {
vec.push(len);
}
// The memo should always be up to date
assert_eq!(vec.len(), len_memo());
rsx! {
for i in 0..len {
Child {
index: i,
vec,
}
}
}
}
#[component]
fn Child(index: usize, vec: Signal<Vec<usize>, SyncStorage>) -> Element {
// This memo should not rerun after the element is removed
let item = use_memo(move || vec.read()[index]);
rsx! {
div { "Item: {item}" }
}
}
let mut dom = VirtualDom::new(app);
dom.rebuild_in_place();
let mut signal = VEC_SIGNAL.with(|cell| (*cell.borrow()).unwrap());
// Wait for the signal to update
for _ in 0..3 {
dom.wait_for_work().await;
dom.render_immediate(&mut dioxus::dioxus_core::NoOpMutations);
println!("Signal: {signal:?}");
}
assert_eq!(signal(), vec![0, 1, 2, 3, 4, 5]);
// Remove each element from the vec
for _ in 0..6 {
signal.pop();
dom.wait_for_work().await;
dom.render_immediate(&mut dioxus::dioxus_core::NoOpMutations);
println!("Signal: {signal:?}");
}
}

View file

@ -14,31 +14,6 @@ use futures_util::StreamExt;
use generational_box::UnsyncStorage;
use once_cell::sync::OnceCell;
/// A thread local that can only be read from the thread it was created on.
pub struct ThreadLocal<T> {
value: T,
owner: std::thread::ThreadId,
}
impl<T> ThreadLocal<T> {
/// Create a new thread local.
pub fn new(value: T) -> Self {
ThreadLocal {
value,
owner: std::thread::current().id(),
}
}
/// Get the value of the thread local.
pub fn get(&self) -> Option<&T> {
(self.owner == std::thread::current().id()).then_some(&self.value)
}
}
// SAFETY: This is safe because the thread local can only be read from the thread it was created on.
unsafe impl<T> Send for ThreadLocal<T> {}
unsafe impl<T> Sync for ThreadLocal<T> {}
struct UpdateInformation<T> {
dirty: Arc<AtomicBool>,
callback: RefCell<Box<dyn FnMut() -> T>>,
@ -70,25 +45,12 @@ impl<T: 'static> Memo<T> {
let (tx, mut rx) = futures_channel::mpsc::unbounded();
let myself: Rc<OnceCell<Memo<T>>> = Rc::new(OnceCell::new());
let thread_local = ThreadLocal::new(myself.clone());
let callback = {
let dirty = dirty.clone();
move || match thread_local.get() {
Some(memo) => match memo.get() {
Some(memo) => {
memo.recompute();
}
None => {
tracing::error!("Memo was not initialized in the same thread it was created in. This is likely a bug in dioxus");
dirty.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = tx.unbounded_send(());
}
},
None => {
dirty.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = tx.unbounded_send(());
}
move || {
dirty.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = tx.unbounded_send(());
}
};
let rc = ReactiveContext::new_with_callback(
@ -113,7 +75,7 @@ impl<T: 'static> Memo<T> {
};
let _ = myself.set(memo);
spawn(async move {
spawn_isomorphic(async move {
while rx.next().await.is_some() {
// Remove any pending updates
while rx.try_next().is_ok() {}