feat: liveview working with warp

This commit is contained in:
Jonathan Kelley 2022-03-15 01:02:44 -04:00
parent 71c96a8053
commit 71184b51a0
14 changed files with 257 additions and 375 deletions

View file

@ -58,7 +58,8 @@ members = [
"packages/mobile",
"packages/interpreter",
"packages/fermi",
"packages/tui"
"packages/tui",
"packages/liveview",
]
[dev-dependencies]

View file

@ -0,0 +1,5 @@
{
"rust-analyzer.cargo.features": [
"warp"
]
}

View file

@ -6,32 +6,29 @@ repository = "https://github.com/DioxusLabs/dioxus/"
homepage = "https://dioxuslabs.com"
documentation = "https://dioxuslabs.com"
keywords = ["dom", "ui", "gui", "react", "wasm"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3"
futures-util = { version = "0.3", default-features = false, features = [
"sink",
] }
futures-channel = { version = "0.3.17", features = ["sink"] }
pretty_env_logger = "0.4"
tokio-stream = { version = "0.1.1", features = ["net"] }
dioxus-core = { path = "../core", features = ["serialize"] }
dioxus-html = { path = "../html", features = ["serialize"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
tokio-util = { version = "0.7.0", features = ["full"] }
# axum = { version = "0.4.2", optional = true, features = ["ws", "headers"] }
# serde = { version = "1.0.136", features = ["derive"] }
# serde_json = "1.0.79"
dioxus-html = { path = "../html", features = ["serialize"] }
dioxus-core = { path = "../core", features = ["serialize"] }
# warp
warp = { version = "0.3", optional = true }
[features]
# default = ["axum"]
[dev-dependencies]
# tokio = { version = "1.14.0", features = ["full"] }
# tracing = "0.1"
# tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# tower-http = { version = "0.2.0", features = ["fs", "trace"] }
# headers = "0.3"
default = []

View file

@ -1,26 +0,0 @@
[package]
name = "cloud"
version = "0.0.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3"
futures-util = { version = "0.3", default-features = false, features = [
"sink",
] }
futures-channel = { version = "0.3.17", features = ["sink"] }
pretty_env_logger = "0.4"
tokio-stream = { version = "0.1.1", features = ["net"] }
dioxus = { git = "https://github.com/dioxuslabs/dioxus" }
dioxus-html = { git = "https://github.com/dioxuslabs/dioxus", features = [
"serialize",
] }
dioxus-core = { git = "https://github.com/dioxuslabs/dioxus", features = [
"serialize",
] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"

View file

@ -1,96 +0,0 @@
//! Example websocket server.
//!
//! Run with
//!
//! ```not_rust
//! cargo run -p example-websockets
//! ```
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
TypedHeader,
},
http::StatusCode,
response::IntoResponse,
routing::{get, get_service},
Router,
};
use std::net::SocketAddr;
use tower_http::{
services::ServeDir,
trace::{DefaultMakeSpan, TraceLayer},
};
#[tokio::main]
async fn main() {
// Set the RUST_LOG, if it hasn't been explicitly defined
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "example_websockets=debug,tower_http=debug")
}
tracing_subscriber::fmt::init();
// build our application with some routes
let app = Router::new()
.fallback(
get_service(
ServeDir::new("examples/axum_assets").append_index_html_on_directories(true),
)
.handle_error(|error: std::io::Error| async move {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Unhandled internal error: {}", error),
)
}),
)
// routes are matched from bottom to top, so we have to put `nest` at the
// top since it matches all routes
.route("/ws", get(ws_handler))
// logging so we can see whats going on
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
// run it with hyper
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn ws_handler(
ws: WebSocketUpgrade,
user_agent: Option<TypedHeader<headers::UserAgent>>,
) -> impl IntoResponse {
if let Some(TypedHeader(user_agent)) = user_agent {
println!("`{}` connected", user_agent.as_str());
}
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
if let Some(msg) = socket.recv().await {
if let Ok(msg) = msg {
println!("Client says: {:?}", msg);
} else {
println!("client disconnected");
return;
}
}
loop {
if socket
.send(Message::Text(String::from("Hi!")))
.await
.is_err()
{
println!("client disconnected");
return;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

View file

@ -1 +0,0 @@
<script src='script.js'></script>

View file

@ -1,9 +0,0 @@
const socket = new WebSocket('ws://localhost:3000/ws');
socket.addEventListener('open', function (event) {
socket.send('Hello Server!');
});
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});

View file

@ -0,0 +1,32 @@
use dioxus_core::{Element, LazyNodes, Scope};
use dioxus_liveview as liveview;
use warp::ws::Ws;
use warp::Filter;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let addr = ([127, 0, 0, 1], 3030);
// todo: compactify this routing under one liveview::app method
let view = liveview::new(addr);
let body = view.body();
let routes = warp::path::end()
.map(move || warp::reply::html(body.clone()))
.or(warp::path("app")
.and(warp::ws())
.and(warp::any().map(move || view.clone()))
.map(|ws: Ws, view: liveview::Liveview| {
ws.on_upgrade(|socket| async move {
view.upgrade(socket, app).await;
})
}));
warp::serve(routes).run(addr).await;
}
fn app(cx: Scope) -> Element {
cx.render(LazyNodes::new(|f| f.text(format_args!("hello world!"))))
}

View file

@ -0,0 +1,91 @@
use crate::events;
use dioxus_core::prelude::*;
use futures_util::{pin_mut, SinkExt, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::task::LocalPoolHandle;
use warp::ws::{Message, WebSocket};
impl crate::Liveview {
pub async fn upgrade(&self, ws: warp::ws::WebSocket, app: fn(Scope) -> Element) {
connect(ws, self.pool.clone(), app).await;
}
}
pub async fn connect(ws: WebSocket, pool: LocalPoolHandle, app: fn(Scope) -> Element) {
// Use a counter to assign a new unique ID for this user.
// Split the socket into a sender and receive of messages.
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (edits_tx, edits_rx) = mpsc::unbounded_channel();
let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
let mut event_rx = UnboundedReceiverStream::new(event_rx);
let vdom_fut = pool.spawn_pinned(move || async move {
let mut vdom = VirtualDom::new(app);
let edits = vdom.rebuild();
let serialized = serde_json::to_string(&edits.edits).unwrap();
edits_tx.send(serialized).unwrap();
loop {
use futures_util::future::{select, Either};
let new_event = {
let vdom_fut = vdom.wait_for_work();
pin_mut!(vdom_fut);
match select(event_rx.next(), vdom_fut).await {
Either::Left((l, _)) => l,
Either::Right((_, _)) => None,
}
};
if let Some(new_event) = new_event {
vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
} else {
let mutations = vdom.work_with_deadline(|| false);
for mutation in mutations {
let edits = serde_json::to_string(&mutation.edits).unwrap();
edits_tx.send(edits).unwrap();
}
}
}
});
loop {
use futures_util::future::{select, Either};
match select(user_ws_rx.next(), edits_rx.next()).await {
Either::Left((l, _)) => {
if let Some(Ok(msg)) = l {
if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
let user_event = events::trigger_from_serialized(msg.params);
event_tx.send(user_event).unwrap();
} else {
break;
}
} else {
break;
}
}
Either::Right((edits, _)) => {
if let Some(edits) = edits {
// send the edits to the client
if user_ws_tx.send(Message::text(edits)).await.is_err() {
break;
}
} else {
break;
}
}
}
}
vdom_fut.abort();
}

View file

@ -12,20 +12,10 @@ pub(crate) struct IpcMessage {
pub params: serde_json::Value,
}
impl IpcMessage {
pub(crate) fn method(&self) -> &str {
self.method.as_str()
}
pub(crate) fn params(self) -> serde_json::Value {
self.params
}
}
pub(crate) fn parse_ipc_message(payload: &str) -> Option<IpcMessage> {
match serde_json::from_str(payload) {
Ok(message) => Some(message),
Err(e) => None,
Err(_) => None,
}
}

View file

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html>
<head>
<title>Dioxus app</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
</head>
<body>
<div id="main"></div>
<script>
import("./index.js").then(function (module) {
module.main();
});
</script>
</body>
</html>

View file

@ -1,11 +1,41 @@
export function main() {
function main() {
let root = window.document.getElementById("main");
if (root != null) {
window.interpreter = new Interpreter(root);
window.ipc.postMessage(serializeIpcMessage("initialize"));
// create a new ipc
window.ipc = new IPC(root);
window.ipc.send(serializeIpcMessage("initialize"));
}
}
export class Interpreter {
class IPC {
constructor(root) {
// connect to the websocket
window.interpreter = new Interpreter(root);
this.ws = new WebSocket(WS_ADDR);
this.ws.onopen = () => {
console.log("Connected to the websocket");
};
this.ws.onerror = (err) => {
console.error("Error: ", err);
};
this.ws.onmessage = (event) => {
let edits = JSON.parse(event.data);
window.interpreter.handleEdits(edits);
};
}
send(msg) {
this.ws.send(msg);
}
}
class Interpreter {
constructor(root) {
this.root = root;
this.stack = [root];
@ -207,7 +237,7 @@ export class Interpreter {
event.preventDefault();
const href = target.getAttribute("href");
if (href !== "" && href !== null && href !== undefined) {
window.ipc.postMessage(
window.ipc.send(
serializeIpcMessage("browser_open", { href })
);
}
@ -263,7 +293,7 @@ export class Interpreter {
if (realId == null) {
return;
}
window.ipc.postMessage(
window.ipc.send(
serializeIpcMessage("user_event", {
event: edit.event_name,
mounted_dom_id: parseInt(realId),
@ -287,7 +317,7 @@ export class Interpreter {
}
}
export function serialize_event(event) {
function serialize_event(event) {
switch (event.type) {
case "copy":
case "cut":

View file

@ -0,0 +1,63 @@
pub(crate) mod events;
pub mod adapters {
#[cfg(feature = "warp")]
pub mod warp_adapter;
#[cfg(feature = "axum")]
pub mod axum_adapter;
#[cfg(feature = "actix")]
pub mod actix_adapter;
}
use std::net::SocketAddr;
#[cfg(feature = "warp")]
pub use adapters::warp_adapter::connect;
#[cfg(feature = "axum")]
pub use adapters::axum_adapter::connect;
#[cfg(feature = "actix")]
pub use adapters::actix_adapter::connect;
use tokio_util::task::LocalPoolHandle;
#[derive(Clone)]
pub struct Liveview {
pool: LocalPoolHandle,
addr: String,
}
impl Liveview {
pub fn body(&self) -> String {
format!(
r#"
<!DOCTYPE html>
<html>
<head>
<title>Dioxus app</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
</head>
<body>
<div id="main"></div>
<script>
var WS_ADDR = "ws://{addr}/app";
{interpreter}
main();
</script>
</body>
</html>"#,
addr = self.addr,
interpreter = include_str!("../src/interpreter.js")
)
}
}
pub fn new(addr: impl Into<SocketAddr>) -> Liveview {
let addr: SocketAddr = addr.into();
Liveview {
pool: LocalPoolHandle::new(16),
addr: addr.to_string(),
}
}

View file

@ -1,210 +0,0 @@
// #![deny(warnings)]
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use futures_util::{pin_mut, SinkExt, StreamExt, TryFutureExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use warp::Filter;
mod events;
/// Our global unique user id counter.
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
/// Our state of currently connected users.
///
/// - Key is their id
/// - Value is a sender of `warp::ws::Message`
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let state = Users::default();
let chat = warp::path("chat")
.and(warp::ws())
.and(warp::any().map(move || state.clone()))
.map(|ws: warp::ws::Ws, users| ws.on_upgrade(move |socket| user_connected(socket, users)));
let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
let routes = index.or(chat);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
async fn user_connected(ws: WebSocket, users: Users) {
// Use a counter to assign a new unique ID for this user.
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
eprintln!("new chat user: {}", my_id);
// Split the socket into a sender and receive of messages.
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (edits_tx, edits_rx) = mpsc::unbounded_channel();
let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
let mut event_rx = UnboundedReceiverStream::new(event_rx);
tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
use dioxus::prelude::*;
fn app(cx: Scope) -> Element {
let (count, set_count) = use_state(&cx, || 0);
cx.render(rsx! {
div { "hello world: {count}" }
button {
onclick: move |_| set_count(count + 1),
"increment"
}
})
}
let mut vdom = VirtualDom::new(app);
let edits = vdom.rebuild();
let serialized = serde_json::to_string(&edits.edits).unwrap();
edits_tx.send(serialized).unwrap();
loop {
use futures_util::future::{select, Either};
let new_event = {
let vdom_fut = vdom.wait_for_work();
pin_mut!(vdom_fut);
match select(event_rx.next(), vdom_fut).await {
Either::Left((l, _)) => l,
Either::Right((_, _)) => None,
}
};
if let Some(new_event) = new_event {
vdom.handle_message(dioxus::core::SchedulerMsg::Event(new_event));
} else {
let mutations = vdom.work_with_deadline(|| false);
for mutation in mutations {
let edits = serde_json::to_string(&mutation.edits).unwrap();
edits_tx.send(edits).unwrap();
}
}
}
})
});
loop {
use futures_util::future::{select, Either};
match select(user_ws_rx.next(), edits_rx.next()).await {
Either::Left((l, _)) => {
if let Some(Ok(msg)) = l {
if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
let user_event = events::trigger_from_serialized(msg.params);
event_tx.send(user_event).unwrap();
}
}
}
Either::Right((edits, _)) => {
if let Some(edits) = edits {
// send the edits to the client
if user_ws_tx.send(Message::text(edits)).await.is_err() {
break;
}
}
}
}
}
// log::info!("");
}
async fn user_message(my_id: usize, msg: Message, users: &Users) {
// Skip any non-Text messages...
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg);
// New message from this user, send it to everyone else (except same uid)...
for (&uid, tx) in users.read().await.iter() {
if my_id != uid {
if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
// The tx is disconnected, our `user_disconnected` code
// should be happening in another task, nothing more to
// do here.
}
}
}
}
async fn user_disconnected(my_id: usize, users: &Users) {
eprintln!("good bye user: {}", my_id);
// Stream closed up, so remove from the user list
users.write().await.remove(&my_id);
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html lang="en">
<head>
<title>Warp Chat</title>
</head>
<body>
<h1>Warp chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="send">Send</button>
<script type="text/javascript">
const chat = document.getElementById('chat');
const text = document.getElementById('text');
const uri = 'ws://' + location.host + '/chat';
const ws = new WebSocket(uri);
function message(data) {
const line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
ws.onopen = function() {
chat.innerHTML = '<p><em>Connected!</em></p>';
};
ws.onmessage = function(msg) {
message(msg.data);
};
ws.onclose = function() {
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
};
send.onclick = function() {
const msg = text.value;
ws.send(msg);
text.value = '';
message('<You>: ' + msg);
};
</script>
</body>
</html>
"#;