From 60cf7ca0257d1e26419c38999a350ae9933d78b7 Mon Sep 17 00:00:00 2001 From: Liam Gallagher Date: Sat, 28 Sep 2024 08:09:46 +1200 Subject: [PATCH] Refactor BRP to allow for 3rd-party transports (#15438) ## Objective Closes #15408 (somewhat) ## Solution - Moved the existing HTTP transport to its own module with its own plugin (`RemoteHttpPlugin`) (disabled on WASM) - Swapped out the `smol` crate for the smaller crates it re-exports to make it easier to keep out non-wasm code (HTTP transport needs `async-io` which can't build on WASM) - Added a new public `BrpSender` resource holding the matching sender for the `BrpReceiver`' (formally `BrpMailbox`). This allows other crates to send `BrpMessage`'s to the "mailbox". ## Testing TODO --------- Co-authored-by: Matty --- crates/bevy_remote/Cargo.toml | 3 +- crates/bevy_remote/src/http.rs | 246 +++++++++++++++++++++++++++++++ crates/bevy_remote/src/lib.rs | 259 ++++----------------------------- examples/remote/client.rs | 4 +- examples/remote/server.rs | 6 +- 5 files changed, 285 insertions(+), 233 deletions(-) create mode 100644 crates/bevy_remote/src/http.rs diff --git a/crates/bevy_remote/Cargo.toml b/crates/bevy_remote/Cargo.toml index 7b6a199dad..471cfe3536 100644 --- a/crates/bevy_remote/Cargo.toml +++ b/crates/bevy_remote/Cargo.toml @@ -27,10 +27,11 @@ hyper = { version = "1", features = ["server", "http1"] } serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } http-body-util = "0.1" +async-channel = "2" # dependencies that will not compile on wasm [target.'cfg(not(target_family = "wasm"))'.dependencies] -smol = "2" +async-io = "2" smol-hyper = "0.1" [lints] diff --git a/crates/bevy_remote/src/http.rs b/crates/bevy_remote/src/http.rs new file mode 100644 index 0000000000..e05c297810 --- /dev/null +++ b/crates/bevy_remote/src/http.rs @@ -0,0 +1,246 @@ +//! The BRP transport using JSON-RPC over HTTP. +//! +//! Adding the [`RemoteHttpPlugin`] to your [`App`] causes Bevy to accept +//! connections over HTTP (by default, on port 15702) while your app is running. +//! +//! Clients are expected to `POST` JSON requests to the root URL; see the `client` +//! example for a trivial example of use. + +#![cfg(not(target_family = "wasm"))] + +use crate::{error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpSender}; +use anyhow::Result as AnyhowResult; +use async_channel::Sender; +use async_io::Async; +use bevy_app::{App, Plugin, Startup}; +use bevy_ecs::system::{Res, Resource}; +use bevy_tasks::IoTaskPool; +use core::net::{IpAddr, Ipv4Addr}; +use http_body_util::{BodyExt as _, Full}; +use hyper::{ + body::{Bytes, Incoming}, + server::conn::http1, + service, Request, Response, +}; +use serde_json::Value; +use smol_hyper::rt::{FuturesIo, SmolTimer}; +use std::net::TcpListener; +use std::net::TcpStream; + +/// The default port that Bevy will listen on. +/// +/// This value was chosen randomly. +pub const DEFAULT_PORT: u16 = 15702; + +/// The default host address that Bevy will use for its server. +pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + +/// Add this plugin to your [`App`] to allow remote connections over HTTP to inspect and modify entities. +/// It requires the [`RemotePlugin`](super::RemotePlugin). +/// +/// This BRP transport cannot be used when targeting WASM. +/// +/// The defaults are: +/// - [`DEFAULT_ADDR`] : 127.0.0.1. +/// - [`DEFAULT_PORT`] : 15702. +pub struct RemoteHttpPlugin { + /// The address that Bevy will bind to. + address: IpAddr, + /// The port that Bevy will listen on. + port: u16, +} + +impl Default for RemoteHttpPlugin { + fn default() -> Self { + Self { + address: DEFAULT_ADDR, + port: DEFAULT_PORT, + } + } +} + +impl Plugin for RemoteHttpPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(HostAddress(self.address)) + .insert_resource(HostPort(self.port)) + .add_systems(Startup, start_http_server); + } +} + +impl RemoteHttpPlugin { + /// Set the IP address that the server will use. + #[must_use] + pub fn with_address(mut self, address: impl Into) -> Self { + self.address = address.into(); + self + } + + /// Set the remote port that the server will listen on. + #[must_use] + pub fn with_port(mut self, port: u16) -> Self { + self.port = port; + self + } +} + +/// A resource containing the IP address that Bevy will host on. +/// +/// Currently, changing this while the application is running has no effect; this merely +/// reflects the IP address that is set during the setup of the [`RemoteHttpPlugin`]. +#[derive(Debug, Resource)] +pub struct HostAddress(pub IpAddr); + +/// A resource containing the port number that Bevy will listen on. +/// +/// Currently, changing this while the application is running has no effect; this merely +/// reflects the host that is set during the setup of the [`RemoteHttpPlugin`]. +#[derive(Debug, Resource)] +pub struct HostPort(pub u16); + +/// A system that starts up the Bevy Remote Protocol HTTP server. +fn start_http_server( + request_sender: Res, + address: Res, + remote_port: Res, +) { + IoTaskPool::get() + .spawn(server_main( + address.0, + remote_port.0, + request_sender.clone(), + )) + .detach(); +} + +/// The Bevy Remote Protocol server main loop. +async fn server_main( + address: IpAddr, + port: u16, + request_sender: Sender, +) -> AnyhowResult<()> { + listen( + Async::::bind((address, port))?, + &request_sender, + ) + .await +} + +async fn listen( + listener: Async, + request_sender: &Sender, +) -> AnyhowResult<()> { + loop { + let (client, _) = listener.accept().await?; + + let request_sender = request_sender.clone(); + IoTaskPool::get() + .spawn(async move { + let _ = handle_client(client, request_sender).await; + }) + .detach(); + } +} + +async fn handle_client( + client: Async, + request_sender: Sender, +) -> AnyhowResult<()> { + http1::Builder::new() + .timer(SmolTimer::new()) + .serve_connection( + FuturesIo::new(client), + service::service_fn(|request| process_request_batch(request, &request_sender)), + ) + .await?; + + Ok(()) +} + +/// A helper function for the Bevy Remote Protocol server that handles a batch +/// of requests coming from a client. +async fn process_request_batch( + request: Request, + request_sender: &Sender, +) -> AnyhowResult>> { + let batch_bytes = request.into_body().collect().await?.to_bytes(); + let batch: Result = serde_json::from_slice(&batch_bytes); + + let serialized = match batch { + Ok(BrpBatch::Single(request)) => { + serde_json::to_string(&process_single_request(request, request_sender).await?)? + } + Ok(BrpBatch::Batch(requests)) => { + let mut responses = Vec::new(); + + for request in requests { + responses.push(process_single_request(request, request_sender).await?); + } + + serde_json::to_string(&responses)? + } + Err(err) => { + let err = BrpResponse::new( + None, + Err(BrpError { + code: error_codes::INVALID_REQUEST, + message: err.to_string(), + data: None, + }), + ); + + serde_json::to_string(&err)? + } + }; + + Ok(Response::new(Full::new(Bytes::from( + serialized.as_bytes().to_owned(), + )))) +} + +/// A helper function for the Bevy Remote Protocol server that processes a single +/// request coming from a client. +async fn process_single_request( + request: Value, + request_sender: &Sender, +) -> AnyhowResult { + // Reach in and get the request ID early so that we can report it even when parsing fails. + let id = request.as_object().and_then(|map| map.get("id")).cloned(); + + let request: BrpRequest = match serde_json::from_value(request) { + Ok(v) => v, + Err(err) => { + return Ok(BrpResponse::new( + id, + Err(BrpError { + code: error_codes::INVALID_REQUEST, + message: err.to_string(), + data: None, + }), + )); + } + }; + + if request.jsonrpc != "2.0" { + return Ok(BrpResponse::new( + id, + Err(BrpError { + code: error_codes::INVALID_REQUEST, + message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"), + data: None, + }), + )); + } + + let (result_sender, result_receiver) = async_channel::bounded(1); + + let _ = request_sender + .send(BrpMessage { + method: request.method, + params: request.params, + sender: result_sender, + }) + .await; + + let result = result_receiver.recv().await?; + Ok(BrpResponse::new(request.id, result)) +} diff --git a/crates/bevy_remote/src/lib.rs b/crates/bevy_remote/src/lib.rs index 581d9752fb..8f11a18adb 100644 --- a/crates/bevy_remote/src/lib.rs +++ b/crates/bevy_remote/src/lib.rs @@ -1,11 +1,10 @@ -//! An implementation of the Bevy Remote Protocol over HTTP and JSON, to allow -//! for remote control of a Bevy app. +//! An implementation of the Bevy Remote Protocol, to allow for remote control of a Bevy app. //! -//! Adding the [`RemotePlugin`] to your [`App`] causes Bevy to accept -//! connections over HTTP (by default, on port 15702) while your app is running. -//! These *remote clients* can inspect and alter the state of the -//! entity-component system. Clients are expected to `POST` JSON requests to the -//! root URL; see the `client` example for a trivial example of use. +//! Adding the [`RemotePlugin`] to your [`App`] will setup everything needed without +//! starting any transports. To start accepting remote connections you will need to +//! add a second plugin like the [`RemoteHttpPlugin`](http::RemoteHttpPlugin) to enable communication +//! over HTTP. These *remote clients* can inspect and alter the state of the +//! entity-component system. //! //! The Bevy Remote Protocol is based on the JSON-RPC 2.0 protocol. //! @@ -245,66 +244,31 @@ //! [fully-qualified type names]: bevy_reflect::TypePath::type_path //! [fully-qualified type name]: bevy_reflect::TypePath::type_path -#![cfg(not(target_family = "wasm"))] - -use core::net::{IpAddr, Ipv4Addr}; -use std::sync::RwLock; - -use anyhow::Result as AnyhowResult; +use async_channel::{Receiver, Sender}; use bevy_app::prelude::*; use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ entity::Entity, - system::{Commands, In, IntoSystem, Res, Resource, System, SystemId}, + system::{Commands, In, IntoSystem, Resource, System, SystemId}, world::World, }; -use bevy_reflect::Reflect; -use bevy_tasks::IoTaskPool; use bevy_utils::{prelude::default, HashMap}; -use http_body_util::{BodyExt as _, Full}; -use hyper::{ - body::{Bytes, Incoming}, - server::conn::http1, - service, Request, Response, -}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use smol::{ - channel::{self, Receiver, Sender}, - Async, -}; -use smol_hyper::rt::{FuturesIo, SmolTimer}; -use std::net::{TcpListener, TcpStream}; +use std::sync::RwLock; pub mod builtin_methods; - -/// The default port that Bevy will listen on. -/// -/// This value was chosen randomly. -pub const DEFAULT_PORT: u16 = 15702; - -/// The default host address that Bevy will use for its server. -pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +pub mod http; const CHANNEL_SIZE: usize = 16; /// Add this plugin to your [`App`] to allow remote connections to inspect and modify entities. /// /// This the main plugin for `bevy_remote`. See the [crate-level documentation] for details on -/// the protocol and its default methods. -/// -/// The defaults are: -/// - [`DEFAULT_ADDR`] : 127.0.0.1. -/// - [`DEFAULT_PORT`] : 15702. +/// the available protocols and its default methods. /// /// [crate-level documentation]: crate pub struct RemotePlugin { - /// The address that Bevy will use. - address: IpAddr, - - /// The port that Bevy will listen on. - port: u16, - /// The verbs that the server will recognize and respond to. methods: RwLock< Vec<( @@ -319,26 +283,10 @@ impl RemotePlugin { /// any associated methods. fn empty() -> Self { Self { - address: DEFAULT_ADDR, - port: DEFAULT_PORT, methods: RwLock::new(vec![]), } } - /// Set the IP address that the server will use. - #[must_use] - pub fn with_address(mut self, address: impl Into) -> Self { - self.address = address.into(); - self - } - - /// Set the remote port that the server will listen on. - #[must_use] - pub fn with_port(mut self, port: u16) -> Self { - self.port = port; - self - } - /// Add a remote method to the plugin using the given `name` and `handler`. #[must_use] pub fn with_method( @@ -403,28 +351,12 @@ impl Plugin for RemotePlugin { ); } - app.insert_resource(HostAddress(self.address)) - .insert_resource(HostPort(self.port)) - .insert_resource(remote_methods) - .add_systems(Startup, start_server) + app.insert_resource(remote_methods) + .add_systems(PreStartup, setup_mailbox_channel) .add_systems(Update, process_remote_requests); } } -/// A resource containing the IP address that Bevy will host on. -/// -/// Currently, changing this while the application is running has no effect; this merely -/// reflects the IP address that is set during the setup of the [`RemotePlugin`]. -#[derive(Debug, Resource)] -pub struct HostAddress(pub IpAddr); - -/// A resource containing the port number that Bevy will listen on. -/// -/// Currently, changing this while the application is running has no effect; this merely -/// reflects the host that is set during the setup of the [`RemotePlugin`]. -#[derive(Debug, Resource, Reflect)] -pub struct HostPort(pub u16); - /// The type of a function that implements a remote method (`bevy/get`, `bevy/query`, etc.) /// /// The first parameter is the JSON value of the `params`. Typically, an @@ -658,7 +590,7 @@ pub enum BrpBatch { /// A message from the Bevy Remote Protocol server thread to the main world. /// -/// This is placed in the [`BrpMailbox`]. +/// This is placed in the [`BrpReceiver`]. #[derive(Debug, Clone)] pub struct BrpMessage { /// The request method. @@ -673,41 +605,41 @@ pub struct BrpMessage { pub sender: Sender, } +/// A resource holding the matching sender for the [`BrpReceiver`]'s receiver. +#[derive(Debug, Resource, Deref, DerefMut)] +pub struct BrpSender(Sender); + /// A resource that receives messages sent by Bevy Remote Protocol clients. /// /// Every frame, the `process_remote_requests` system drains this mailbox and /// processes the messages within. #[derive(Debug, Resource, Deref, DerefMut)] -pub struct BrpMailbox(Receiver); +pub struct BrpReceiver(Receiver); -/// A system that starts up the Bevy Remote Protocol server. -fn start_server(mut commands: Commands, address: Res, remote_port: Res) { +fn setup_mailbox_channel(mut commands: Commands) { // Create the channel and the mailbox. - let (request_sender, request_receiver) = channel::bounded(CHANNEL_SIZE); - commands.insert_resource(BrpMailbox(request_receiver)); - - IoTaskPool::get() - .spawn(server_main(address.0, remote_port.0, request_sender)) - .detach(); + let (request_sender, request_receiver) = async_channel::bounded(CHANNEL_SIZE); + commands.insert_resource(BrpSender(request_sender)); + commands.insert_resource(BrpReceiver(request_receiver)); } -/// A system that receives requests placed in the [`BrpMailbox`] and processes +/// A system that receives requests placed in the [`BrpReceiver`] and processes /// them, using the [`RemoteMethods`] resource to map each request to its handler. /// /// This needs exclusive access to the [`World`] because clients can manipulate /// anything in the ECS. fn process_remote_requests(world: &mut World) { - if !world.contains_resource::() { + if !world.contains_resource::() { return; } - while let Ok(message) = world.resource_mut::().try_recv() { + while let Ok(message) = world.resource_mut::().try_recv() { // Fetch the handler for the method. If there's no such handler // registered, return an error. let methods = world.resource::(); let Some(handler) = methods.0.get(&message.method) else { - let _ = message.sender.send_blocking(Err(BrpError { + let _ = message.sender.force_send(Err(BrpError { code: error_codes::METHOD_NOT_FOUND, message: format!("Method `{}` not found", message.method), data: None, @@ -719,7 +651,7 @@ fn process_remote_requests(world: &mut World) { let result = match world.run_system_with_input(*handler, message.params) { Ok(result) => result, Err(error) => { - let _ = message.sender.send_blocking(Err(BrpError { + let _ = message.sender.force_send(Err(BrpError { code: error_codes::INTERNAL_ERROR, message: format!("Failed to run method handler: {error}"), data: None, @@ -728,139 +660,6 @@ fn process_remote_requests(world: &mut World) { } }; - let _ = message.sender.send_blocking(result); + let _ = message.sender.force_send(result); } } - -/// The Bevy Remote Protocol server main loop. -async fn server_main( - address: IpAddr, - port: u16, - request_sender: Sender, -) -> AnyhowResult<()> { - listen( - Async::::bind((address, port))?, - &request_sender, - ) - .await -} - -async fn listen( - listener: Async, - request_sender: &Sender, -) -> AnyhowResult<()> { - loop { - let (client, _) = listener.accept().await?; - - let request_sender = request_sender.clone(); - IoTaskPool::get() - .spawn(async move { - let _ = handle_client(client, request_sender).await; - }) - .detach(); - } -} - -async fn handle_client( - client: Async, - request_sender: Sender, -) -> AnyhowResult<()> { - http1::Builder::new() - .timer(SmolTimer::new()) - .serve_connection( - FuturesIo::new(client), - service::service_fn(|request| process_request_batch(request, &request_sender)), - ) - .await?; - - Ok(()) -} - -/// A helper function for the Bevy Remote Protocol server that handles a batch -/// of requests coming from a client. -async fn process_request_batch( - request: Request, - request_sender: &Sender, -) -> AnyhowResult>> { - let batch_bytes = request.into_body().collect().await?.to_bytes(); - let batch: Result = serde_json::from_slice(&batch_bytes); - - let serialized = match batch { - Ok(BrpBatch::Single(request)) => { - serde_json::to_string(&process_single_request(request, request_sender).await?)? - } - Ok(BrpBatch::Batch(requests)) => { - let mut responses = Vec::new(); - - for request in requests { - responses.push(process_single_request(request, request_sender).await?); - } - - serde_json::to_string(&responses)? - } - Err(err) => { - let err = BrpResponse::new( - None, - Err(BrpError { - code: error_codes::INVALID_REQUEST, - message: err.to_string(), - data: None, - }), - ); - - serde_json::to_string(&err)? - } - }; - - Ok(Response::new(Full::new(Bytes::from( - serialized.as_bytes().to_owned(), - )))) -} - -/// A helper function for the Bevy Remote Protocol server that processes a single -/// request coming from a client. -async fn process_single_request( - request: Value, - request_sender: &Sender, -) -> AnyhowResult { - // Reach in and get the request ID early so that we can report it even when parsing fails. - let id = request.as_object().and_then(|map| map.get("id")).cloned(); - - let request: BrpRequest = match serde_json::from_value(request) { - Ok(v) => v, - Err(err) => { - return Ok(BrpResponse::new( - id, - Err(BrpError { - code: error_codes::INVALID_REQUEST, - message: err.to_string(), - data: None, - }), - )); - } - }; - - if request.jsonrpc != "2.0" { - return Ok(BrpResponse::new( - id, - Err(BrpError { - code: error_codes::INVALID_REQUEST, - message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"), - data: None, - }), - )); - } - - let (result_sender, result_receiver) = channel::bounded(1); - - let _ = request_sender - .send(BrpMessage { - method: request.method, - params: request.params, - sender: result_sender, - }) - .await; - - let result = result_receiver.recv().await?; - Ok(BrpResponse::new(request.id, result)) -} diff --git a/examples/remote/client.rs b/examples/remote/client.rs index bf684b6471..7794aec052 100644 --- a/examples/remote/client.rs +++ b/examples/remote/client.rs @@ -5,7 +5,9 @@ use anyhow::Result as AnyhowResult; use argh::FromArgs; use bevy::remote::{ builtin_methods::{BrpQuery, BrpQueryFilter, BrpQueryParams, BRP_QUERY_METHOD}, - BrpRequest, DEFAULT_ADDR, DEFAULT_PORT, + http::DEFAULT_ADDR, + http::DEFAULT_PORT, + BrpRequest, }; /// Struct containing the command-line arguments that can be passed to this example. diff --git a/examples/remote/server.rs b/examples/remote/server.rs index c829de9472..bab8cacf4c 100644 --- a/examples/remote/server.rs +++ b/examples/remote/server.rs @@ -1,12 +1,16 @@ //! A Bevy app that you can connect to with the BRP and edit. -use bevy::{prelude::*, remote::RemotePlugin}; +use bevy::{ + prelude::*, + remote::{http::RemoteHttpPlugin, RemotePlugin}, +}; use serde::{Deserialize, Serialize}; fn main() { App::new() .add_plugins(DefaultPlugins) .add_plugins(RemotePlugin::default()) + .add_plugins(RemoteHttpPlugin::default()) .add_systems(Startup, setup) .register_type::() .run();