feat: get liveview working across the boar

This commit is contained in:
Jonathan Kelley 2022-12-15 19:57:48 -08:00
parent 7790d2c065
commit b95069619f
9 changed files with 232 additions and 315 deletions

View file

@ -51,4 +51,16 @@ salvo = { version = "0.32.0", features = ["affix", "ws"] }
tower = "0.4.12"
[features]
default = ["warp"]
default = ["salvo"]
[[example]]
name = "axum"
required-features = ["axum"]
[[example]]
name = "salvo"
required-features = ["salvo"]
[[example]]
name = "warp"
required-features = ["warp"]

View file

@ -1,32 +1,53 @@
#[cfg(not(feature = "axum"))]
fn main() {}
use axum::{extract::ws::WebSocketUpgrade, response::Html, routing::get, Router};
use dioxus::prelude::*;
fn app(cx: Scope) -> Element {
let mut num = use_state(cx, || 0);
cx.render(rsx! {
div {
"hello axum! {num}"
button { onclick: move |_| num += 1, "Increment" }
}
})
}
#[cfg(feature = "axum")]
#[tokio::main]
async fn main() {
use axum::{extract::ws::WebSocketUpgrade, response::Html, routing::get, Router};
use dioxus_core::{Element, LazyNodes, Scope};
pretty_env_logger::init();
fn app(cx: Scope) -> Element {
cx.render(LazyNodes::new(|f| f.text(format_args!("hello world!"))))
}
let addr: std::net::SocketAddr = ([127, 0, 0, 1], 3030).into();
let view = dioxus_liveview::new(addr);
let body = view.body("<title>Dioxus Liveview</title>");
let view = dioxus_liveview::LiveViewPool::new();
let app = Router::new()
.route("/", get(move || async { Html(body) }))
.route(
"/app",
"/",
get(move || async move {
Html(format!(
r#"
<!DOCTYPE html>
<html>
<head> <title>Dioxus LiveView with Warp</title> </head>
<body> <div id="main"></div> </body>
{glue}
</html>
"#,
glue = dioxus_liveview::interpreter_glue(&format!("ws://{addr}/ws"))
))
}),
)
.route(
"/ws",
get(move |ws: WebSocketUpgrade| async move {
ws.on_upgrade(move |socket| async move {
view.upgrade_axum(socket, app).await;
_ = view.launch(dioxus_liveview::axum_socket(socket), app).await;
})
}),
);
println!("Listening on http://{}", addr);
axum::Server::bind(&addr.to_string().parse().unwrap())
.serve(app.into_make_service())
.await

View file

@ -1,55 +1,71 @@
#[cfg(not(feature = "salvo"))]
fn main() {}
use dioxus::prelude::*;
use dioxus_liveview::LiveViewPool;
use salvo::extra::affix;
use salvo::extra::ws::WsHandler;
use salvo::prelude::*;
use std::net::SocketAddr;
use std::sync::Arc;
fn app(cx: Scope) -> Element {
let mut num = use_state(cx, || 0);
cx.render(rsx! {
div {
"hello salvo! {num}"
button { onclick: move |_| num += 1, "Increment" }
}
})
}
#[cfg(feature = "salvo")]
#[tokio::main]
async fn main() {
use std::sync::Arc;
use dioxus_core::{Element, LazyNodes, Scope};
use dioxus_liveview as liveview;
use dioxus_liveview::LiveView;
use salvo::extra::affix;
use salvo::extra::ws::WsHandler;
use salvo::prelude::*;
fn app(cx: Scope) -> Element {
cx.render(LazyNodes::new(|f| f.text(format_args!("hello world!"))))
}
pretty_env_logger::init();
let addr = ([127, 0, 0, 1], 3030);
let addr: SocketAddr = ([127, 0, 0, 1], 3030).into();
let view = LiveViewPool::new();
// todo: compactify this routing under one liveview::app method
let view = liveview::new(addr);
let router = Router::new()
.hoop(affix::inject(Arc::new(view)))
.get(index)
.push(Router::with_path("app").get(connect));
.push(Router::with_path("ws").get(connect));
println!("Listening on http://{}", addr);
Server::new(TcpListener::bind(addr)).serve(router).await;
#[handler]
fn index(depot: &mut Depot, res: &mut Response) {
let view = depot.obtain::<Arc<Liveview>>().unwrap();
let body = view.body("<title>Dioxus LiveView</title>");
res.render(Text::Html(body));
}
#[handler]
async fn connect(
req: &mut Request,
depot: &mut Depot,
res: &mut Response,
) -> Result<(), StatusError> {
let view = depot.obtain::<Arc<Liveview>>().unwrap().clone();
let fut = WsHandler::new().handle(req, res)?;
let fut = async move {
if let Some(ws) = fut.await {
view.upgrade_salvo(ws, app).await;
}
};
tokio::task::spawn(fut);
Ok(())
}
}
#[handler]
fn index(_depot: &mut Depot, res: &mut Response) {
let addr: SocketAddr = ([127, 0, 0, 1], 3030).into();
res.render(Text::Html(format!(
r#"
<!DOCTYPE html>
<html>
<head> <title>Dioxus LiveView with Warp</title> </head>
<body> <div id="main"></div> </body>
{glue}
</html>
"#,
glue = dioxus_liveview::interpreter_glue(&format!("ws://{addr}/ws"))
)));
}
#[handler]
async fn connect(
req: &mut Request,
depot: &mut Depot,
res: &mut Response,
) -> Result<(), StatusError> {
let view = depot.obtain::<Arc<LiveViewPool>>().unwrap().clone();
let fut = WsHandler::new().handle(req, res)?;
tokio::spawn(async move {
if let Some(ws) = fut.await {
_ = view.launch(dioxus_liveview::salvo_socket(ws), app).await;
}
});
Ok(())
}

View file

@ -1,5 +1,6 @@
use dioxus::prelude::*;
use dioxus_liveview::LiveView;
use dioxus_liveview::adapters::warp_adapter::warp_socket;
use dioxus_liveview::LiveViewPool;
use std::net::SocketAddr;
use warp::ws::Ws;
use warp::Filter;
@ -9,7 +10,7 @@ fn app(cx: Scope) -> Element {
cx.render(rsx! {
div {
"hello world! {num}"
"hello warp! {num}"
button {
onclick: move |_| num += 1,
"Increment"
@ -38,15 +39,14 @@ async fn main() {
))
});
let view = LiveView::new();
let pool = LiveViewPool::new();
let ws = warp::path("ws")
.and(warp::ws())
.and(warp::any().map(move || view.clone()))
.map(move |ws: Ws, view: LiveView| {
println!("Got a connection!");
.and(warp::any().map(move || pool.clone()))
.map(move |ws: Ws, pool: LiveViewPool| {
ws.on_upgrade(|ws| async move {
let _ = view.upgrade_warp(ws, app).await;
let _ = pool.launch(warp_socket(ws), app).await;
})
});

View file

@ -1,94 +1,24 @@
use crate::events;
use crate::{LiveViewError, LiveViewSocket};
use axum::extract::ws::{Message, WebSocket};
use dioxus_core::prelude::*;
use futures_util::{
future::{select, Either},
pin_mut, SinkExt, StreamExt,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::task::LocalPoolHandle;
use futures_util::{SinkExt, StreamExt};
impl crate::Liveview {
pub async fn upgrade_axum(&self, ws: WebSocket, app: fn(Scope) -> Element) {
connect(ws, self.pool.clone(), app, ()).await;
}
pub async fn upgrade_axum_with_props<T>(
&self,
ws: WebSocket,
app: fn(Scope<T>) -> Element,
props: T,
) where
T: Send + Sync + 'static,
{
connect(ws, self.pool.clone(), app, props).await;
}
/// Convert a warp websocket into a LiveViewSocket
///
/// This is required to launch a LiveView app using the warp web framework
pub fn axum_socket(ws: WebSocket) -> impl LiveViewSocket {
ws.map(transform_rx)
.with(transform_tx)
.sink_map_err(|_| LiveViewError::SendingFailed)
}
pub async fn connect<T>(
socket: WebSocket,
pool: LocalPoolHandle,
app: fn(Scope<T>) -> Element,
props: T,
) where
T: Send + Sync + 'static,
{
let (mut user_ws_tx, mut user_ws_rx) = socket.split();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (edits_tx, edits_rx) = mpsc::unbounded_channel();
let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
let mut event_rx = UnboundedReceiverStream::new(event_rx);
let vdom_fut = pool.clone().spawn_pinned(move || async move {
let mut vdom = VirtualDom::new_with_props(app, props);
let edits = vdom.rebuild();
let serialized = serde_json::to_string(&edits.edits).unwrap();
edits_tx.send(serialized).unwrap();
loop {
let new_event = {
let vdom_fut = vdom.wait_for_work();
pin_mut!(vdom_fut);
match select(event_rx.next(), vdom_fut).await {
Either::Left((l, _)) => l,
Either::Right((_, _)) => None,
}
};
if let Some(new_event) = new_event {
vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
} else {
let mutations = vdom.work_with_deadline(|| false);
for mutation in mutations {
let edits = serde_json::to_string(&mutation.edits).unwrap();
edits_tx.send(edits).unwrap();
}
}
}
});
loop {
match select(user_ws_rx.next(), edits_rx.next()).await {
Either::Left((l, _)) => {
if let Some(Ok(msg)) = l {
if let Ok(Some(msg)) = msg.to_text().map(events::parse_ipc_message) {
let user_event = events::trigger_from_serialized(msg.params);
event_tx.send(user_event).unwrap();
} else {
break;
}
} else {
break;
}
}
Either::Right((edits, _)) => {
if let Some(edits) = edits {
// send the edits to the client
if user_ws_tx.send(Message::Text(edits)).await.is_err() {
break;
}
} else {
break;
}
}
}
}
vdom_fut.abort();
fn transform_rx(message: Result<Message, axum::Error>) -> Result<String, LiveViewError> {
message
.map_err(|_| LiveViewError::SendingFailed)?
.into_text()
.map_err(|_| LiveViewError::SendingFailed)
}
async fn transform_tx(message: String) -> Result<Message, axum::Error> {
Ok(Message::Text(message))
}

View file

@ -1,110 +1,25 @@
use crate::events;
use dioxus_core::prelude::*;
use futures_util::{pin_mut, SinkExt, StreamExt};
use futures_util::{SinkExt, StreamExt};
use salvo::extra::ws::{Message, WebSocket};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::task::LocalPoolHandle;
impl crate::Liveview {
pub async fn upgrade_salvo(&self, ws: salvo::extra::ws::WebSocket, app: fn(Scope) -> Element) {
connect(ws, self.pool.clone(), app, ()).await;
}
pub async fn upgrade_salvo_with_props<T>(
&self,
ws: salvo::extra::ws::WebSocket,
app: fn(Scope<T>) -> Element,
props: T,
) where
T: Send + Sync + 'static,
{
connect(ws, self.pool.clone(), app, props).await;
}
use crate::{LiveViewError, LiveViewSocket};
/// Convert a salvo websocket into a LiveViewSocket
///
/// This is required to launch a LiveView app using the warp web framework
pub fn salvo_socket(ws: WebSocket) -> impl LiveViewSocket {
ws.map(transform_rx)
.with(transform_tx)
.sink_map_err(|_| LiveViewError::SendingFailed)
}
pub async fn connect<T>(
ws: WebSocket,
pool: LocalPoolHandle,
app: fn(Scope<T>) -> Element,
props: T,
) where
T: Send + Sync + 'static,
{
// Use a counter to assign a new unique ID for this user.
fn transform_rx(message: Result<Message, salvo::Error>) -> Result<String, LiveViewError> {
let as_bytes = message.map_err(|_| LiveViewError::SendingFailed)?;
// Split the socket into a sender and receive of messages.
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
let msg = String::from_utf8(as_bytes.into_bytes()).map_err(|_| LiveViewError::SendingFailed)?;
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (edits_tx, edits_rx) = mpsc::unbounded_channel();
let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
let mut event_rx = UnboundedReceiverStream::new(event_rx);
let vdom_fut = pool.spawn_pinned(move || async move {
let mut vdom = VirtualDom::new_with_props(app, props);
let edits = vdom.rebuild();
let serialized = serde_json::to_string(&edits.edits).unwrap();
edits_tx.send(serialized).unwrap();
loop {
use futures_util::future::{select, Either};
let new_event = {
let vdom_fut = vdom.wait_for_work();
pin_mut!(vdom_fut);
match select(event_rx.next(), vdom_fut).await {
Either::Left((l, _)) => l,
Either::Right((_, _)) => None,
}
};
if let Some(new_event) = new_event {
vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
} else {
let mutations = vdom.work_with_deadline(|| false);
for mutation in mutations {
let edits = serde_json::to_string(&mutation.edits).unwrap();
edits_tx.send(edits).unwrap();
}
}
}
});
loop {
use futures_util::future::{select, Either};
match select(user_ws_rx.next(), edits_rx.next()).await {
Either::Left((l, _)) => {
if let Some(Ok(msg)) = l {
if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
if msg.method == "user_event" {
let user_event = events::trigger_from_serialized(msg.params);
event_tx.send(user_event).unwrap();
}
} else {
break;
}
} else {
break;
}
}
Either::Right((edits, _)) => {
if let Some(edits) = edits {
// send the edits to the client
if user_ws_tx.send(Message::text(edits)).await.is_err() {
break;
}
} else {
break;
}
}
}
}
vdom_fut.abort();
Ok(msg)
}
async fn transform_tx(message: String) -> Result<Message, salvo::Error> {
Ok(Message::text(message))
}

View file

@ -1,46 +1,21 @@
use crate::{liveview_eventloop, LiveView, LiveViewError};
use dioxus_core::prelude::*;
use crate::{LiveViewError, LiveViewSocket};
use futures_util::{SinkExt, StreamExt};
use warp::ws::{Message, WebSocket};
impl LiveView {
pub async fn upgrade_warp(
self,
ws: WebSocket,
app: fn(Scope<()>) -> Element,
) -> Result<(), LiveViewError> {
self.upgrade_warp_with_props(ws, app, ()).await
}
pub async fn upgrade_warp_with_props<T: Send + 'static>(
self,
ws: WebSocket,
app: fn(Scope<T>) -> Element,
props: T,
) -> Result<(), LiveViewError> {
let (ws_tx, ws_rx) = ws.split();
let ws_tx = ws_tx
.with(transform_warp)
.sink_map_err(|_| LiveViewError::SendingFailed);
let ws_rx = ws_rx.map(transform_warp_rx);
match self
.pool
.spawn_pinned(move || liveview_eventloop(app, props, ws_tx, ws_rx))
.await
{
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_) => Err(LiveViewError::SendingFailed),
}
}
/// Convert a warp websocket into a LiveViewSocket
///
/// This is required to launch a LiveView app using the warp web framework
pub fn warp_socket(ws: WebSocket) -> impl LiveViewSocket {
ws.map(transform_rx)
.with(transform_tx)
.sink_map_err(|_| LiveViewError::SendingFailed)
}
fn transform_warp_rx(f: Result<Message, warp::Error>) -> Result<String, LiveViewError> {
fn transform_rx(message: Result<Message, warp::Error>) -> Result<String, LiveViewError> {
// destructure the message into the buffer we got from warp
let msg = f.map_err(|_| LiveViewError::SendingFailed)?.into_bytes();
let msg = message
.map_err(|_| LiveViewError::SendingFailed)?
.into_bytes();
// transform it back into a string, saving us the allocation
let msg = String::from_utf8(msg).map_err(|_| LiveViewError::SendingFailed)?;
@ -48,6 +23,6 @@ fn transform_warp_rx(f: Result<Message, warp::Error>) -> Result<String, LiveView
Ok(msg)
}
async fn transform_warp(message: String) -> Result<Message, warp::Error> {
async fn transform_tx(message: String) -> Result<Message, warp::Error> {
Ok(Message::text(message))
}

View file

@ -1,14 +1,23 @@
pub mod adapters {
#[cfg(feature = "warp")]
pub mod warp_adapter;
#[cfg(feature = "warp")]
pub use warp_adapter::*;
#[cfg(feature = "axum")]
pub mod axum_adapter;
#[cfg(feature = "axum")]
pub use axum_adapter::*;
#[cfg(feature = "salvo")]
pub mod salvo_adapter;
#[cfg(feature = "salvo")]
pub use salvo_adapter::*;
}
pub use adapters::*;
pub mod pool;
use futures_util::{SinkExt, StreamExt};
pub use pool::*;
@ -28,6 +37,10 @@ pub enum LiveViewError {
use dioxus_interpreter_js::INTERPRETER_JS;
static MAIN_JS: &str = include_str!("./main.js");
/// This script that gets injected into your app connects this page to the websocket endpoint
///
/// Once the endpoint is connected, it will send the initial state of the app, and then start
/// processing user events and returning edits to the liveview instance
pub fn interpreter_glue(url: &str) -> String {
format!(
r#"

View file

@ -6,22 +6,60 @@ use std::time::Duration;
use tokio_util::task::LocalPoolHandle;
#[derive(Clone)]
pub struct LiveView {
pub struct LiveViewPool {
pub(crate) pool: LocalPoolHandle,
}
impl Default for LiveView {
impl Default for LiveViewPool {
fn default() -> Self {
Self::new()
}
}
impl LiveView {
impl LiveViewPool {
pub fn new() -> Self {
LiveView {
LiveViewPool {
pool: LocalPoolHandle::new(16),
}
}
pub async fn launch(
&self,
ws: impl LiveViewSocket,
app: fn(Scope<()>) -> Element,
) -> Result<(), LiveViewError> {
self.launch_with_props(ws, app, ()).await
}
pub async fn launch_with_props<T: Send + 'static>(
&self,
ws: impl LiveViewSocket,
app: fn(Scope<T>) -> Element,
props: T,
) -> Result<(), LiveViewError> {
match self.pool.spawn_pinned(move || run(app, props, ws)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_) => Err(LiveViewError::SendingFailed),
}
}
}
/// A LiveViewSocket is a Sink and Stream of Strings that Dioxus uses to communicate with the client
pub trait LiveViewSocket:
SinkExt<String, Error = LiveViewError>
+ StreamExt<Item = Result<String, LiveViewError>>
+ Send
+ 'static
{
}
impl<S> LiveViewSocket for S where
S: SinkExt<String, Error = LiveViewError>
+ StreamExt<Item = Result<String, LiveViewError>>
+ Send
+ 'static
{
}
/// The primary event loop for the VirtualDom waiting for user input
@ -31,11 +69,10 @@ impl LiveView {
/// As long as your framework can provide a Sink and Stream of Strings, you can use this function.
///
/// You might need to transform the error types of the web backend into the LiveView error type.
pub async fn liveview_eventloop<T>(
pub async fn run<T>(
app: Component<T>,
props: T,
ws_tx: impl SinkExt<String, Error = LiveViewError>,
ws_rx: impl StreamExt<Item = Result<String, LiveViewError>>,
ws: impl LiveViewSocket,
) -> Result<(), LiveViewError>
where
T: Send + 'static,
@ -46,10 +83,10 @@ where
let edits = serde_json::to_string(&vdom.rebuild()).unwrap();
// pin the futures so we can use select!
pin_mut!(ws_tx);
pin_mut!(ws_rx);
pin_mut!(ws);
ws_tx.send(edits).await?;
// send the initial render to the client
ws.send(edits).await?;
// desktop uses this wrapper struct thing around the actual event itself
// this is sorta driven by tao/wry
@ -63,17 +100,15 @@ where
// poll any futures or suspense
_ = vdom.wait_for_work() => {}
evt = ws_rx.next() => {
evt = ws.next() => {
match evt {
Some(Ok(evt)) => {
let event: IpcMessage = serde_json::from_str(&evt).unwrap();
let event = event.params;
vdom.handle_event(&event.name, event.data.into_any(), event.element, event.bubbles);
}
Some(Err(_e)) => {
// log this I guess?
// when would we get an error here?
if let Ok(IpcMessage { params }) = serde_json::from_str::<IpcMessage>(&evt) {
vdom.handle_event(&params.name, params.data.into_any(), params.element, params.bubbles);
}
}
// log this I guess? when would we get an error here?
Some(Err(_e)) => {},
None => return Ok(()),
}
}
@ -83,6 +118,6 @@ where
.render_with_deadline(tokio::time::sleep(Duration::from_millis(10)))
.await;
ws_tx.send(serde_json::to_string(&edits).unwrap()).await?;
ws.send(serde_json::to_string(&edits).unwrap()).await?;
}
}