fix suspense

This commit is contained in:
Evan Almloff 2023-07-17 16:48:54 -07:00
parent 913d1f0491
commit dd5d974aeb
12 changed files with 351 additions and 110 deletions

View file

@ -83,12 +83,12 @@ impl VirtualDom {
id: scope.id,
});
if matches!(allocated, RenderReturn::Aborted(_)) {
if scope.suspended.get() {
if scope.suspended.get() {
if matches!(allocated, RenderReturn::Aborted(_)) {
self.suspended_scopes.insert(scope.id);
} else if !self.suspended_scopes.is_empty() {
_ = self.suspended_scopes.remove(&scope.id);
}
} else {
_ = self.suspended_scopes.remove(&scope.id);
}
// rebind the lifetime now that its stored internally

View file

@ -16,6 +16,9 @@ serde = "1.0.159"
execute = "0.2.12"
tower-http = { version = "0.4.1", features = ["auth"] }
simple_logger = "4.2.0"
wasm-logger = "0.2.0"
log.workspace = true
cargo-cache = "0.8.3"
[features]
default = []

View file

@ -16,31 +16,30 @@ struct AppProps {
}
fn app(cx: Scope<AppProps>) -> Element {
let state1 = server_cached(|| {
#[cfg(not(feature = "ssr"))]
panic!();
12345
});
assert_eq!(state1, 12345);
let state2 = server_cached(|| {
#[cfg(not(feature = "ssr"))]
panic!();
123456
});
assert_eq!(state2, 123456);
let state3 = server_cached(|| {
#[cfg(not(feature = "ssr"))]
panic!();
1234567
});
assert_eq!(state3, 1234567);
render! {
Child {}
}
}
let mut count = use_state(cx, || cx.props.count);
fn Child(cx: Scope) -> Element {
let state = use_server_future(cx, (), |()| async move {
#[cfg(not(feature = "ssr"))]
panic!();
#[cfg(feature = "ssr")]
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
return 1;
})?;
log::info!("running child");
let state = state.value();
log::info!("child state: {:?}", state);
let mut count = use_state(cx, || 0);
let text = use_state(cx, || "...".to_string());
cx.render(rsx! {
div {
"Server state: {state1}, {state2}, {state3}"
"Server state: {state}"
}
h1 { "High-Five counter: {count}" }
button { onclick: move |_| count += 1, "Up high!" }
@ -77,6 +76,11 @@ async fn get_server_data() -> Result<String, ServerFnError> {
}
fn main() {
#[cfg(feature = "web")]
wasm_logger::init(wasm_logger::Config::default());
#[cfg(feature = "ssr")]
simple_logger::SimpleLogger::new().init().unwrap();
launch!(@([127, 0, 0, 1], 8080), app, {
serve_cfg: ServeConfigBuilder::new(app, AppProps { count: 0 }),
});

View file

@ -0,0 +1,2 @@
pub mod server_cached;
pub mod server_future;

View file

@ -17,7 +17,7 @@ use serde::{de::DeserializeOwned, Serialize};
/// }));
/// }
/// ```
pub fn from_server<O: 'static + Serialize + DeserializeOwned>(server_fn: impl Fn() -> O) -> O {
pub fn server_cached<O: 'static + Serialize + DeserializeOwned>(server_fn: impl Fn() -> O) -> O {
#[cfg(feature = "ssr")]
{
let data = server_fn();

View file

@ -0,0 +1,146 @@
use dioxus::prelude::*;
use serde::{de::DeserializeOwned, Serialize};
use std::any::Any;
use std::cell::Cell;
use std::cell::Ref;
use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
use std::sync::Arc;
/// A future that resolves to a value.
///
/// This runs the future only once - though the future may be regenerated
/// through the [`UseServerFuture::restart`] method.
///
/// This is commonly used for components that cannot be rendered until some
/// asynchronous operation has completed.
///
/// Whenever the hooks dependencies change, the future will be re-evaluated.
/// If a future is pending when the dependencies change, the previous future
/// will be allowed to continue
///
/// - dependencies: a tuple of references to values that are PartialEq + Clone
pub fn use_server_future<T, F, D>(
cx: &ScopeState,
dependencies: D,
future: impl FnOnce(D::Out) -> F,
) -> Option<&UseServerFuture<T>>
where
T: 'static + Serialize + DeserializeOwned,
F: Future<Output = T> + 'static,
D: UseFutureDep,
{
let state = cx.use_hook(move || UseServerFuture {
update: cx.schedule_update(),
needs_regen: Cell::new(true),
value: Default::default(),
task: Cell::new(None),
dependencies: Vec::new(),
});
let first_run = { state.value.borrow().as_ref().is_none() };
if dependencies.clone().apply(&mut state.dependencies) || state.needs_regen.get() {
// We don't need regen anymore
state.needs_regen.set(false);
// Create the new future
let fut = future(dependencies.out());
// Clone in our cells
let value = state.value.clone();
let schedule_update = state.update.clone();
// Cancel the current future
if let Some(current) = state.task.take() {
cx.remove_future(current);
}
state.task.set(Some(cx.push_future(async move {
let data;
#[cfg(feature = "ssr")]
{
data = fut.await;
if first_run {
if let Err(err) = crate::prelude::server_context().push_html_data(&data) {
log::error!("Failed to push HTML data: {}", err);
};
}
}
#[cfg(not(feature = "ssr"))]
{
data = match crate::html_storage::deserialize::take_server_data() {
Some(data) => data,
None => fut.await,
};
}
*value.borrow_mut() = Some(Box::new(data));
schedule_update();
})));
}
if first_run {
log::trace!("Suspending first run of use_server_future");
cx.suspend();
None
} else {
Some(state)
}
}
pub enum FutureState<'a, T> {
Pending,
Complete(&'a T),
Regenerating(&'a T), // the old value
}
pub struct UseServerFuture<T> {
update: Arc<dyn Fn()>,
needs_regen: Cell<bool>,
task: Cell<Option<TaskId>>,
dependencies: Vec<Box<dyn Any>>,
value: Rc<RefCell<Option<Box<T>>>>,
}
pub enum UseFutureState<'a, T> {
Pending,
Complete(&'a T),
Reloading(&'a T),
}
impl<T> UseServerFuture<T> {
/// Restart the future with new dependencies.
///
/// Will not cancel the previous future, but will ignore any values that it
/// generates.
pub fn restart(&self) {
self.needs_regen.set(true);
(self.update)();
}
/// Forcefully cancel a future
pub fn cancel(&self, cx: &ScopeState) {
if let Some(task) = self.task.take() {
cx.remove_future(task);
}
}
/// Return any value, even old values if the future has not yet resolved.
///
/// If the future has never completed, the returned value will be `None`.
pub fn value(&self) -> Ref<'_, T> {
Ref::map(self.value.borrow(), |v| v.as_deref().unwrap())
}
/// Get the ID of the future in Dioxus' internal scheduler
pub fn task(&self) -> Option<TaskId> {
self.task.get()
}
/// Get the current state of the future.
pub fn reloading(&self) -> bool {
self.task.get().is_some()
}
}

View file

@ -14,6 +14,7 @@ pub mod router;
mod adapters;
#[cfg(feature = "ssr")]
pub use adapters::*;
mod hooks;
#[cfg(all(debug_assertions, feature = "hot-reload", feature = "ssr"))]
mod hot_reload;
pub mod launch;
@ -26,7 +27,6 @@ mod serve_config;
#[cfg(feature = "ssr")]
mod server_context;
mod server_fn;
mod use_server;
/// A prelude of commonly used items in dioxus-fullstack.
pub mod prelude {
@ -36,6 +36,7 @@ pub mod prelude {
pub use crate::adapters::salvo_adapter::*;
#[cfg(feature = "warp")]
pub use crate::adapters::warp_adapter::*;
use crate::hooks;
#[cfg(not(feature = "ssr"))]
pub use crate::html_storage::deserialize::get_root_props_from_document;
#[cfg(all(feature = "ssr", feature = "router"))]
@ -53,12 +54,11 @@ pub mod prelude {
pub use crate::server_fn::DioxusServerFn;
#[cfg(feature = "ssr")]
pub use crate::server_fn::{ServerFnMiddleware, ServerFnTraitObj, ServerFunction};
use crate::use_server;
pub use crate::{launch, launch_router};
pub use dioxus_server_macro::*;
#[cfg(feature = "ssr")]
pub use dioxus_ssr::incremental::IncrementalRendererConfig;
pub use server_fn::{self, ServerFn as _, ServerFnError};
pub use use_server::from_server;
pub use hooks::{server_cached::server_cached, server_future::use_server_future};
}

View file

@ -2,19 +2,22 @@
use std::sync::Arc;
use crate::server_context::SERVER_CONTEXT;
use dioxus::prelude::VirtualDom;
use dioxus_ssr::{
incremental::{IncrementalRendererConfig, RenderFreshness, WrapBody},
Renderer,
};
use serde::Serialize;
use std::sync::RwLock;
use tokio::task::spawn_blocking;
use crate::{prelude::*, server_context::with_server_context};
use dioxus::prelude::*;
enum SsrRendererPool {
Renderer(object_pool::Pool<Renderer>),
Incremental(object_pool::Pool<dioxus_ssr::incremental::IncrementalRenderer>),
Renderer(RwLock<Vec<Renderer>>),
Incremental(RwLock<Vec<dioxus_ssr::incremental::IncrementalRenderer>>),
}
impl SsrRendererPool {
@ -24,49 +27,130 @@ impl SsrRendererPool {
route: String,
component: Component<P>,
props: P,
to: &mut WriteBuffer,
server_context: &DioxusServerContext,
) -> Result<RenderFreshness, dioxus_ssr::incremental::IncrementalRendererError> {
) -> Result<(RenderFreshness, String), dioxus_ssr::incremental::IncrementalRendererError> {
let wrapper = FullstackRenderer {
cfg,
cfg: cfg.clone(),
server_context: server_context.clone(),
};
match self {
Self::Renderer(pool) => {
let server_context = Box::new(server_context.clone());
let mut vdom = VirtualDom::new_with_props(component, props);
let mut renderer = pool.write().unwrap().pop().unwrap_or_else(pre_renderer);
with_server_context(server_context, || {
let _ = vdom.rebuild();
let (tx, rx) = tokio::sync::oneshot::channel();
spawn_blocking(move || {
tokio::runtime::Runtime::new()
.expect("couldn't spawn runtime")
.block_on(async move {
let mut vdom = VirtualDom::new_with_props(component, props);
let mut to = WriteBuffer { buffer: Vec::new() };
// before polling the future, we need to set the context
let prev_context =
SERVER_CONTEXT.with(|ctx| ctx.replace(server_context));
// poll the future, which may call server_context()
log::info!("Rebuilding vdom");
let _ = vdom.rebuild();
vdom.wait_for_suspense().await;
log::info!("Suspense resolved");
// after polling the future, we need to restore the context
SERVER_CONTEXT.with(|ctx| ctx.replace(prev_context));
if let Err(err) = wrapper.render_before_body(&mut *to) {
let _ = tx.send(Err(err));
return;
}
if let Err(err) = renderer.render_to(&mut to, &vdom) {
let _ = tx.send(Err(
dioxus_router::prelude::IncrementalRendererError::RenderError(
err,
),
));
return;
}
if let Err(err) = wrapper.render_after_body(&mut *to) {
let _ = tx.send(Err(err));
return;
}
match String::from_utf8(to.buffer) {
Ok(html) => {
let _ =
tx.send(Ok((renderer, RenderFreshness::now(None), html)));
}
Err(err) => {
dioxus_ssr::incremental::IncrementalRendererError::Other(
Box::new(err),
);
}
}
});
});
let mut renderer = pool.pull(pre_renderer);
// SAFETY: The fullstack renderer will only write UTF-8 to the buffer.
wrapper.render_before_body(&mut **to)?;
renderer.render_to(to, &vdom)?;
wrapper.render_after_body(&mut **to)?;
Ok(RenderFreshness::now(None))
let (renderer, freshness, html) = rx.await.unwrap()?;
pool.write().unwrap().push(renderer);
Ok((freshness, html))
}
Self::Incremental(pool) => {
let mut renderer =
pool.pull(|| incremental_pre_renderer(cfg.incremental.as_ref().unwrap()));
Ok(renderer
.render(
route,
component,
props,
&mut **to,
|vdom| {
let server_context = Box::new(server_context.clone());
with_server_context(server_context, || {
let _ = vdom.rebuild();
});
},
&wrapper,
)
.await?)
pool.write().unwrap().pop().unwrap_or_else(|| {
incremental_pre_renderer(cfg.incremental.as_ref().unwrap())
});
let (tx, rx) = tokio::sync::oneshot::channel();
let server_context = server_context.clone();
spawn_blocking(move || {
tokio::runtime::Runtime::new()
.expect("couldn't spawn runtime")
.block_on(async move {
let mut to = WriteBuffer { buffer: Vec::new() };
match renderer
.render(
route,
component,
props,
&mut *to,
|vdom| {
Box::pin(async move {
// before polling the future, we need to set the context
let prev_context = SERVER_CONTEXT
.with(|ctx| ctx.replace(Box::new(server_context)));
// poll the future, which may call server_context()
log::info!("Rebuilding vdom");
let _ = vdom.rebuild();
vdom.wait_for_suspense().await;
log::info!("Suspense resolved");
// after polling the future, we need to restore the context
SERVER_CONTEXT.with(|ctx| ctx.replace(prev_context));
})
},
&wrapper,
)
.await
{
Ok(freshness) => {
match String::from_utf8(to.buffer).map_err(|err| {
dioxus_ssr::incremental::IncrementalRendererError::Other(
Box::new(err),
)
}) {
Ok(html) => {
let _ = tx.send(Ok((freshness, html)));
}
Err(err) => {
let _ = tx.send(Err(err));
}
}
}
Err(err) => {
let _ = tx.send(Err(err));
}
}
})
});
let (freshness, html) = rx.await.unwrap()?;
Ok((freshness, html))
}
}
}
@ -83,18 +167,22 @@ impl SSRState {
pub(crate) fn new<P: Clone>(cfg: &ServeConfig<P>) -> Self {
if cfg.incremental.is_some() {
return Self {
renderers: Arc::new(SsrRendererPool::Incremental(object_pool::Pool::new(
10,
|| incremental_pre_renderer(cfg.incremental.as_ref().unwrap()),
))),
renderers: Arc::new(SsrRendererPool::Incremental(RwLock::new(vec![
incremental_pre_renderer(cfg.incremental.as_ref().unwrap()),
incremental_pre_renderer(cfg.incremental.as_ref().unwrap()),
incremental_pre_renderer(cfg.incremental.as_ref().unwrap()),
incremental_pre_renderer(cfg.incremental.as_ref().unwrap()),
]))),
};
}
Self {
renderers: Arc::new(SsrRendererPool::Renderer(object_pool::Pool::new(
10,
pre_renderer,
))),
renderers: Arc::new(SsrRendererPool::Renderer(RwLock::new(vec![
pre_renderer(),
pre_renderer(),
pre_renderer(),
pre_renderer(),
]))),
}
}
@ -109,31 +197,25 @@ impl SSRState {
> + Send
+ 'a {
async move {
let mut html = WriteBuffer { buffer: Vec::new() };
let ServeConfig { app, props, .. } = cfg;
let freshness = self
let (freshness, html) = self
.renderers
.render_to(cfg, route, *app, props.clone(), &mut html, server_context)
.render_to(cfg, route, *app, props.clone(), server_context)
.await?;
Ok(RenderResponse {
html: String::from_utf8(html.buffer).map_err(|err| {
dioxus_ssr::incremental::IncrementalRendererError::Other(Box::new(err))
})?,
freshness,
})
Ok(RenderResponse { html, freshness })
}
}
}
struct FullstackRenderer<'a, P: Clone + Send + Sync + 'static> {
cfg: &'a ServeConfig<P>,
struct FullstackRenderer<P: Clone + Send + Sync + 'static> {
cfg: ServeConfig<P>,
server_context: DioxusServerContext,
}
impl<'a, P: Clone + Serialize + Send + Sync + 'static> dioxus_ssr::incremental::WrapBody
for FullstackRenderer<'a, P>
impl<P: Clone + Serialize + Send + Sync + 'static> dioxus_ssr::incremental::WrapBody
for FullstackRenderer<P>
{
fn render_before_body<R: std::io::Write>(
&self,
@ -258,7 +340,10 @@ where
Rt: dioxus_router::prelude::Routable + Send + Sync + Serialize,
<Rt as std::str::FromStr>::Err: std::fmt::Display,
{
let wrapper = FullstackRenderer { cfg };
let wrapper = FullstackRenderer {
cfg: cfg.clone(),
server_context: Default::default(),
};
let mut renderer = incremental_pre_renderer(
cfg.incremental
.as_ref()

View file

@ -123,7 +123,7 @@ mod server_fn_impl {
}
std::thread_local! {
static SERVER_CONTEXT: std::cell::RefCell<Box<DioxusServerContext>> = std::cell::RefCell::new(Box::new(DioxusServerContext::default() ));
pub(crate) static SERVER_CONTEXT: std::cell::RefCell<Box<DioxusServerContext>> = std::cell::RefCell::new(Box::new(DioxusServerContext::default() ));
}
/// Get information about the current server request.

View file

@ -1,4 +1,6 @@
//! Exentsions to the incremental renderer to support pre-caching static routes.
use core::pin::Pin;
use std::future::Future;
use std::str::FromStr;
use dioxus::prelude::*;
@ -47,7 +49,9 @@ where
route,
&mut tokio::io::sink(),
|vdom| {
let _ = vdom.rebuild();
Box::pin(async move {
let _ = vdom.wait_for_suspense().await;
})
},
wrapper,
)
@ -65,7 +69,12 @@ where
}
/// Render a route to a writer.
pub async fn render_route<R: WrapBody + Send + Sync, Rt, W, F: FnOnce(&mut VirtualDom)>(
pub async fn render_route<
R: WrapBody + Send + Sync,
Rt,
W,
F: FnOnce(&mut VirtualDom) -> Pin<Box<dyn Future<Output = ()> + '_>>,
>(
renderer: &mut IncrementalRenderer,
route: Rt,
writer: &mut W,

View file

@ -1,14 +1,11 @@
#![allow(non_snake_case)]
use std::{
ops::{Deref, DerefMut},
path::{PathBuf},
time::{Duration},
path::PathBuf,
time::Duration,
};
/// Information about the freshness of a rendered response
#[derive(Debug, Clone, Copy)]
pub struct RenderFreshness {

View file

@ -6,10 +6,12 @@ use crate::fs_cache::ValidCachedPath;
use dioxus_core::{Element, Scope, VirtualDom};
use rustc_hash::FxHasher;
use std::{
future::Future,
hash::BuildHasherDefault,
io::Write,
ops::{Deref, DerefMut},
path::{PathBuf},
path::PathBuf,
pin::Pin,
time::{Duration, SystemTime},
};
use tokio::io::{AsyncWrite, AsyncWriteExt, BufReader};
@ -67,36 +69,29 @@ impl IncrementalRenderer {
self.invalidate_after.is_some()
}
fn render_and_cache<'a, P: 'static, R: WrapBody + Send + Sync>(
async fn render_and_cache<'a, P: 'static, R: WrapBody + Send + Sync>(
&'a mut self,
route: String,
comp: fn(Scope<P>) -> Element,
props: P,
output: &'a mut (impl AsyncWrite + Unpin + Send),
rebuild_with: impl FnOnce(&mut VirtualDom),
rebuild_with: impl FnOnce(&mut VirtualDom) -> Pin<Box<dyn Future<Output = ()> + '_>>,
renderer: &'a R,
) -> impl std::future::Future<Output = Result<RenderFreshness, IncrementalRendererError>> + 'a + Send
{
) -> Result<RenderFreshness, IncrementalRendererError> {
let mut html_buffer = WriteBuffer { buffer: Vec::new() };
let result_1;
let result2;
{
let mut vdom = VirtualDom::new_with_props(comp, props);
rebuild_with(&mut vdom);
rebuild_with(&mut vdom).await;
result_1 = renderer.render_before_body(&mut *html_buffer);
result2 = self.ssr_renderer.render_to(&mut html_buffer, &vdom);
renderer.render_before_body(&mut *html_buffer)?;
self.ssr_renderer.render_to(&mut html_buffer, &vdom)?;
}
async move {
result_1?;
result2?;
renderer.render_after_body(&mut *html_buffer)?;
let html_buffer = html_buffer.buffer;
renderer.render_after_body(&mut *html_buffer)?;
let html_buffer = html_buffer.buffer;
output.write_all(&html_buffer).await?;
output.write_all(&html_buffer).await?;
self.add_to_cache(route, html_buffer)
}
self.add_to_cache(route, html_buffer)
}
fn add_to_cache(
@ -178,7 +173,7 @@ impl IncrementalRenderer {
component: fn(Scope<P>) -> Element,
props: P,
output: &mut (impl AsyncWrite + Unpin + std::marker::Send),
rebuild_with: impl FnOnce(&mut VirtualDom),
rebuild_with: impl FnOnce(&mut VirtualDom) -> Pin<Box<dyn Future<Output = ()> + '_>>,
renderer: &R,
) -> Result<RenderFreshness, IncrementalRendererError> {
// check if this route is cached