Use futures_util mutex instead of flume for sending flush lock wakeups

This commit is contained in:
Jonathan Kelley 2024-02-02 13:09:00 -08:00
parent 38bdab880d
commit 8f70a84c70
No known key found for this signature in database
GPG key ID: 1FBB50F7EB0A08BE
14 changed files with 100 additions and 43 deletions

1
Cargo.lock generated
View file

@ -2476,7 +2476,6 @@ version = "0.4.3"
dependencies = [
"dioxus",
"dioxus-ssr",
"flume",
"futures-channel",
"futures-util",
"longest-increasing-subsequence",

View file

@ -7,7 +7,7 @@ fn main() {
fn app() -> Element {
let mut breed = use_signal(|| "deerhound".to_string());
let breed_list = use_async_memo(move || async move {
let breed_list = use_resource(move || async move {
let list = reqwest::get("https://dog.ceo/api/breeds/list/all")
.await
.unwrap()
@ -44,7 +44,7 @@ fn app() -> Element {
#[component]
fn BreedPic(breed: Signal<String>) -> Element {
let fut = use_async_memo(move || async move {
let fut = use_resource(move || async move {
reqwest::get(format!("https://dog.ceo/api/breed/{breed}/images/random"))
.await
.unwrap()

View file

@ -5,7 +5,7 @@ fn main() {
}
fn app() -> Element {
let future = use_async_memo(move || async move {
let future = use_resource(move || async move {
let mut eval = eval(
r#"
dioxus.send("Hi from JS!");

View file

@ -21,10 +21,7 @@ fn app() -> Element {
if count() > 30 {
return rsx! {
h1 { "Count is too high!" }
button {
onclick: move |_| count.set(0),
"Press to reset"
}
button { onclick: move |_| count.set(0), "Press to reset" }
};
}
@ -38,8 +35,8 @@ fn app() -> Element {
}
});
// use_resource will spawn a future that resolves to a value - essentially an async memo
let _slow_count = use_async_memo(move || async move {
// use_resource will spawn a future that resolves to a value
let _slow_count = use_resource(move || async move {
tokio::time::sleep(Duration::from_millis(200)).await;
count() * 2
});

View file

@ -20,8 +20,6 @@ slab = { workspace = true }
futures-channel = { workspace = true }
tracing = { workspace = true }
serde = { version = "1", features = ["derive"], optional = true }
flume = { version = "0.11.0", default-features = false, features = ["async"]}
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }

View file

@ -1,5 +1,5 @@
use crate::{runtime::Runtime, Element, ScopeId, Task};
use futures_util::{future::poll_fn, Future};
use futures_util::Future;
use std::sync::Arc;
/// Get the current scope id
@ -257,13 +257,11 @@ pub fn after_render(f: impl FnMut() + 'static) {
/// Effects rely on this to ensure that they only run effects after the DOM has been updated. Without flush_sync effects
/// are run immediately before diffing the DOM, which causes all sorts of out-of-sync weirdness.
pub async fn flush_sync() {
// if flushing() {
// return;
// }
_ = Runtime::with(|rt| rt.flush.clone())
// Wait for the flush lock to be available
// We release it immediately, so it's impossible for the lock to be held longer than this function
Runtime::with(|rt| rt.flush_mutex.clone())
.unwrap()
.recv_async()
.lock()
.await;
}

View file

@ -1,5 +1,3 @@
use rustc_hash::FxHashSet;
use crate::{
innerlude::{LocalTask, SchedulerMsg},
scope_context::Scope,
@ -9,6 +7,7 @@ use crate::{
use std::{
cell::{Cell, Ref, RefCell},
rc::Rc,
sync::Arc,
};
thread_local! {
@ -32,18 +31,18 @@ pub struct Runtime {
pub(crate) sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
// Tasks waiting to be manually resumed when we call wait_for_work
pub(crate) flush: flume::Receiver<()>,
// the virtualdom will hold this lock while it's doing syncronous work
// when the lock is lifted, tasks waiting for the lock will be able to run
pub(crate) flush_mutex: Arc<futures_util::lock::Mutex<()>>,
pub(crate) flush_lock: Cell<Option<futures_util::lock::OwnedMutexGuard<()>>>,
}
impl Runtime {
pub(crate) fn new(
sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
flush: flume::Receiver<()>,
) -> Rc<Self> {
pub(crate) fn new(sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>) -> Rc<Self> {
Rc::new(Self {
sender,
flush,
flush_mutex: Default::default(),
flush_lock: Default::default(),
rendering: Cell::new(true),
scope_states: Default::default(),
scope_stack: Default::default(),
@ -150,6 +149,23 @@ impl Runtime {
pub(crate) fn with_scope<R>(scope: ScopeId, f: impl FnOnce(&Scope) -> R) -> Option<R> {
Self::with(|rt| rt.get_state(scope).map(|sc| f(&sc))).flatten()
}
/// Acquire the flush lock and store it interally
///
/// This means the virtual dom is currently doing syncronous work
/// The lock will be held until `release_flush_lock` is called - and then the OwnedLock will be dropped
pub(crate) fn acquire_flush_lock(&self) {
self.flush_lock
.set(Some(self.flush_mutex.try_lock_owned().unwrap()));
}
/// Release the flush lock
///
/// On the drop of the flush lock, all tasks waiting on `flush_sync` will spring to life via their wakers.
/// You can now freely poll those tasks and they can progress
pub(crate) fn release_flush_lock(&self) {
self.flush_lock.take();
}
}
/// A guard for a new runtime. This must be used to override the current runtime when importing components from a dynamic library that has it's own runtime.

View file

@ -202,7 +202,6 @@ pub struct VirtualDom {
pub(crate) suspended_scopes: FxHashSet<ScopeId>,
rx: futures_channel::mpsc::UnboundedReceiver<SchedulerMsg>,
flush_tx: flume::Sender<()>,
}
impl VirtualDom {
@ -306,12 +305,10 @@ impl VirtualDom {
/// ```
pub(crate) fn new_with_component(root: impl AnyProps + 'static) -> Self {
let (tx, rx) = futures_channel::mpsc::unbounded();
let (flush_tx, flush_rx) = flume::unbounded(); // I don't think this needs to be unbounded
let mut dom = Self {
rx,
flush_tx,
runtime: Runtime::new(tx, flush_rx),
runtime: Runtime::new(tx),
scopes: Default::default(),
dirty_scopes: Default::default(),
templates: Default::default(),
@ -430,12 +427,13 @@ impl VirtualDom {
self.poll_tasks().await;
}
/// Poll futures without progressing any futures from the flush table
///
async fn poll_tasks(&mut self) {
loop {
// Send the flush signal to the runtime, waking up tasks
_ = self.flush_tx.try_send(());
// Release the flush lock
// This will cause all the flush wakers to immediately spring to life, which we will off with process_events
self.runtime.release_flush_lock();
loop {
// Process all events - Scopes are marked dirty, etc
// Sometimes when wakers fire we get a slew of updates at once, so its important that we drain this completely
self.process_events();
@ -448,6 +446,10 @@ impl VirtualDom {
// Make sure we set the runtime since we're running user code
let _runtime = RuntimeGuard::new(self.runtime.clone());
// Hold a lock to the flush sync to prevent tasks from running in the event we get an immediate
// When we're doing awaiting the rx, the lock will be dropped and tasks waiting on the lock will get waked
self.runtime.acquire_flush_lock();
match self.rx.next().await.expect("channel should never close") {
SchedulerMsg::Immediate(id) => self.mark_dirty(id),
SchedulerMsg::TaskNotified(id) => self.runtime.handle_task_wakeup(id),
@ -546,6 +548,7 @@ impl VirtualDom {
// Process any events that might be pending in the queue
// Signals marked with .write() need a chance to be handled by the effect driver
// This also processes futures which might progress into immediates
self.process_events();
// Next, diff any dirty scopes

View file

@ -21,6 +21,26 @@ use element::DioxusTUIMutationWriter;
pub use plasmo::{query::Query, Config, RenderingMode, Size, TuiContext};
use plasmo::{render, Driver};
pub mod launch {
use super::*;
pub type Config = super::Config;
/// Launches the WebView and runs the event loop, with configuration and root props.
pub fn launch(
root: fn() -> Element,
contexts: Vec<Box<dyn Fn() -> Box<dyn Any>>>,
platform_config: Config,
) {
let mut virtual_dom = VirtualDom::new(root);
for context in contexts {
virtual_dom.insert_any_root_context(context());
}
launch_vdom_cfg(virtual_dom, platform_config)
}
}
pub fn launch(app: fn() -> Element) {
launch_cfg(app, Config::default())
}

View file

@ -76,6 +76,16 @@ impl LaunchBuilder {
}
}
#[cfg(feature = "tui")]
/// Launch your tui application
pub fn tui() -> LaunchBuilder<dioxus_tui::Config, UnsendContext> {
LaunchBuilder {
launch_fn: dioxus_tui::launch::launch,
contexts: Vec::new(),
platform_config: None,
}
}
/// Provide a custom launch function for your application.
///
/// Useful for third party renderers to tap into the launch builder API without having to reimplement it.
@ -215,3 +225,9 @@ pub fn launch_desktop(app: fn() -> Element) {
pub fn launch_fullstack(app: fn() -> Element) {
LaunchBuilder::fullstack().launch(app)
}
#[cfg(feature = "tui")]
/// Launch your tui application without any additional configuration. See [`LaunchBuilder`] for more options.
pub fn launch_tui(app: fn() -> Element) {
LaunchBuilder::tui().launch(app)
}

View file

@ -1,7 +1,5 @@
use crate::use_hook_did_run;
use dioxus_core::prelude::*;
use dioxus_signals::{CopyValue, Effect, ReactiveContext, Writable};
// use futures_util::select;
use dioxus_signals::ReactiveContext;
/// Create a new effect. The effect will be run immediately and whenever any signal it reads changes.
/// The signal will be owned by the current component and will be dropped when the component is dropped.
@ -16,6 +14,9 @@ pub fn use_effect(mut callback: impl FnMut() + 'static) {
let rc = ReactiveContext::new(None);
loop {
// Wait for the dom the be finished with sync work
flush_sync().await;
// Run the effect
rc.run_in(|| callback());

View file

@ -56,6 +56,9 @@ pub fn use_maybe_sync_memo<R: PartialEq, S: Storage<SignalData<R>>>(
spawn(async move {
loop {
// Wait for the dom the be finished with sync work
flush_sync().await;
rc.changed().await;
let new = rc.run_in(|| f());
if new != *state.peek() {

View file

@ -2,7 +2,7 @@
use crate::{use_callback, use_signal};
use dioxus_core::{
prelude::{spawn, suspend, use_hook},
prelude::{flush_sync, spawn, suspend, use_hook},
Task,
};
use dioxus_signals::*;
@ -13,7 +13,7 @@ use std::future::Future;
///
/// Regular memos are synchronous and resolve immediately. However, you might want to resolve a memo
#[must_use = "Consider using `cx.spawn` to run a future without reading its value"]
pub fn use_async_memo<T, F>(future: impl Fn() -> F + 'static) -> AsyncMemo<T>
pub fn use_resource<T, F>(future: impl Fn() -> F + 'static) -> AsyncMemo<T>
where
T: 'static,
F: Future<Output = T> + 'static,
@ -28,6 +28,9 @@ where
// Spawn a wrapper task that polls the innner future and watch its dependencies
spawn(async move {
// Wait for the dom the be finished with sync work
flush_sync().await;
// move the future here and pin it so we can poll it
let fut = fut;
pin_mut!(fut);
@ -88,7 +91,9 @@ impl<T> AsyncMemo<T> {
// Manually set the value in the future slot without starting the future over
pub fn set(&mut self, new_value: T) {
todo!()
// if let Some(task) = self.task.take() {
// cx.remove_future(task);
// }
// self.value.set(Some(new_value));
}

View file

@ -69,6 +69,7 @@ impl TuiContext {
}
pub fn quit(&self) {
// panic!("ack")
self.tx.unbounded_send(InputEvent::Close).unwrap();
}
@ -258,7 +259,7 @@ pub fn render<R: Driver>(
TermEvent::Resize(_, _) => updated = true,
_ => {}
},
InputEvent::Close => break,
InputEvent::Close => panic!("ackkkk"),
};
if let InputEvent::UserInput(evt) = evt.unwrap() {