Refactor BRP to allow for 3rd-party transports (#15438)

## Objective

Closes #15408 (somewhat)

## Solution

- Moved the existing HTTP transport to its own module with its own
plugin (`RemoteHttpPlugin`) (disabled on WASM)
- Swapped out the `smol` crate for the smaller crates it re-exports to
make it easier to keep out non-wasm code (HTTP transport needs
`async-io` which can't build on WASM)
- Added a new public `BrpSender` resource holding the matching sender
for the `BrpReceiver`' (formally `BrpMailbox`). This allows other crates
to send `BrpMessage`'s to the "mailbox".

## Testing

TODO

---------

Co-authored-by: Matty <weatherleymatthew@gmail.com>
This commit is contained in:
Liam Gallagher 2024-09-28 08:09:46 +12:00 committed by GitHub
parent e788e3bc83
commit 60cf7ca025
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 285 additions and 233 deletions

View file

@ -27,10 +27,11 @@ hyper = { version = "1", features = ["server", "http1"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" } serde_json = { version = "1" }
http-body-util = "0.1" http-body-util = "0.1"
async-channel = "2"
# dependencies that will not compile on wasm # dependencies that will not compile on wasm
[target.'cfg(not(target_family = "wasm"))'.dependencies] [target.'cfg(not(target_family = "wasm"))'.dependencies]
smol = "2" async-io = "2"
smol-hyper = "0.1" smol-hyper = "0.1"
[lints] [lints]

View file

@ -0,0 +1,246 @@
//! The BRP transport using JSON-RPC over HTTP.
//!
//! Adding the [`RemoteHttpPlugin`] to your [`App`] causes Bevy to accept
//! connections over HTTP (by default, on port 15702) while your app is running.
//!
//! Clients are expected to `POST` JSON requests to the root URL; see the `client`
//! example for a trivial example of use.
#![cfg(not(target_family = "wasm"))]
use crate::{error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpSender};
use anyhow::Result as AnyhowResult;
use async_channel::Sender;
use async_io::Async;
use bevy_app::{App, Plugin, Startup};
use bevy_ecs::system::{Res, Resource};
use bevy_tasks::IoTaskPool;
use core::net::{IpAddr, Ipv4Addr};
use http_body_util::{BodyExt as _, Full};
use hyper::{
body::{Bytes, Incoming},
server::conn::http1,
service, Request, Response,
};
use serde_json::Value;
use smol_hyper::rt::{FuturesIo, SmolTimer};
use std::net::TcpListener;
use std::net::TcpStream;
/// The default port that Bevy will listen on.
///
/// This value was chosen randomly.
pub const DEFAULT_PORT: u16 = 15702;
/// The default host address that Bevy will use for its server.
pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
/// Add this plugin to your [`App`] to allow remote connections over HTTP to inspect and modify entities.
/// It requires the [`RemotePlugin`](super::RemotePlugin).
///
/// This BRP transport cannot be used when targeting WASM.
///
/// The defaults are:
/// - [`DEFAULT_ADDR`] : 127.0.0.1.
/// - [`DEFAULT_PORT`] : 15702.
pub struct RemoteHttpPlugin {
/// The address that Bevy will bind to.
address: IpAddr,
/// The port that Bevy will listen on.
port: u16,
}
impl Default for RemoteHttpPlugin {
fn default() -> Self {
Self {
address: DEFAULT_ADDR,
port: DEFAULT_PORT,
}
}
}
impl Plugin for RemoteHttpPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(HostAddress(self.address))
.insert_resource(HostPort(self.port))
.add_systems(Startup, start_http_server);
}
}
impl RemoteHttpPlugin {
/// Set the IP address that the server will use.
#[must_use]
pub fn with_address(mut self, address: impl Into<IpAddr>) -> Self {
self.address = address.into();
self
}
/// Set the remote port that the server will listen on.
#[must_use]
pub fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
}
/// A resource containing the IP address that Bevy will host on.
///
/// Currently, changing this while the application is running has no effect; this merely
/// reflects the IP address that is set during the setup of the [`RemoteHttpPlugin`].
#[derive(Debug, Resource)]
pub struct HostAddress(pub IpAddr);
/// A resource containing the port number that Bevy will listen on.
///
/// Currently, changing this while the application is running has no effect; this merely
/// reflects the host that is set during the setup of the [`RemoteHttpPlugin`].
#[derive(Debug, Resource)]
pub struct HostPort(pub u16);
/// A system that starts up the Bevy Remote Protocol HTTP server.
fn start_http_server(
request_sender: Res<BrpSender>,
address: Res<HostAddress>,
remote_port: Res<HostPort>,
) {
IoTaskPool::get()
.spawn(server_main(
address.0,
remote_port.0,
request_sender.clone(),
))
.detach();
}
/// The Bevy Remote Protocol server main loop.
async fn server_main(
address: IpAddr,
port: u16,
request_sender: Sender<BrpMessage>,
) -> AnyhowResult<()> {
listen(
Async::<TcpListener>::bind((address, port))?,
&request_sender,
)
.await
}
async fn listen(
listener: Async<TcpListener>,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<()> {
loop {
let (client, _) = listener.accept().await?;
let request_sender = request_sender.clone();
IoTaskPool::get()
.spawn(async move {
let _ = handle_client(client, request_sender).await;
})
.detach();
}
}
async fn handle_client(
client: Async<TcpStream>,
request_sender: Sender<BrpMessage>,
) -> AnyhowResult<()> {
http1::Builder::new()
.timer(SmolTimer::new())
.serve_connection(
FuturesIo::new(client),
service::service_fn(|request| process_request_batch(request, &request_sender)),
)
.await?;
Ok(())
}
/// A helper function for the Bevy Remote Protocol server that handles a batch
/// of requests coming from a client.
async fn process_request_batch(
request: Request<Incoming>,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<Response<Full<Bytes>>> {
let batch_bytes = request.into_body().collect().await?.to_bytes();
let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
let serialized = match batch {
Ok(BrpBatch::Single(request)) => {
serde_json::to_string(&process_single_request(request, request_sender).await?)?
}
Ok(BrpBatch::Batch(requests)) => {
let mut responses = Vec::new();
for request in requests {
responses.push(process_single_request(request, request_sender).await?);
}
serde_json::to_string(&responses)?
}
Err(err) => {
let err = BrpResponse::new(
None,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: err.to_string(),
data: None,
}),
);
serde_json::to_string(&err)?
}
};
Ok(Response::new(Full::new(Bytes::from(
serialized.as_bytes().to_owned(),
))))
}
/// A helper function for the Bevy Remote Protocol server that processes a single
/// request coming from a client.
async fn process_single_request(
request: Value,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<BrpResponse> {
// Reach in and get the request ID early so that we can report it even when parsing fails.
let id = request.as_object().and_then(|map| map.get("id")).cloned();
let request: BrpRequest = match serde_json::from_value(request) {
Ok(v) => v,
Err(err) => {
return Ok(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: err.to_string(),
data: None,
}),
));
}
};
if request.jsonrpc != "2.0" {
return Ok(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
data: None,
}),
));
}
let (result_sender, result_receiver) = async_channel::bounded(1);
let _ = request_sender
.send(BrpMessage {
method: request.method,
params: request.params,
sender: result_sender,
})
.await;
let result = result_receiver.recv().await?;
Ok(BrpResponse::new(request.id, result))
}

View file

@ -1,11 +1,10 @@
//! An implementation of the Bevy Remote Protocol over HTTP and JSON, to allow //! An implementation of the Bevy Remote Protocol, to allow for remote control of a Bevy app.
//! for remote control of a Bevy app.
//! //!
//! Adding the [`RemotePlugin`] to your [`App`] causes Bevy to accept //! Adding the [`RemotePlugin`] to your [`App`] will setup everything needed without
//! connections over HTTP (by default, on port 15702) while your app is running. //! starting any transports. To start accepting remote connections you will need to
//! These *remote clients* can inspect and alter the state of the //! add a second plugin like the [`RemoteHttpPlugin`](http::RemoteHttpPlugin) to enable communication
//! entity-component system. Clients are expected to `POST` JSON requests to the //! over HTTP. These *remote clients* can inspect and alter the state of the
//! root URL; see the `client` example for a trivial example of use. //! entity-component system.
//! //!
//! The Bevy Remote Protocol is based on the JSON-RPC 2.0 protocol. //! The Bevy Remote Protocol is based on the JSON-RPC 2.0 protocol.
//! //!
@ -245,66 +244,31 @@
//! [fully-qualified type names]: bevy_reflect::TypePath::type_path //! [fully-qualified type names]: bevy_reflect::TypePath::type_path
//! [fully-qualified type name]: bevy_reflect::TypePath::type_path //! [fully-qualified type name]: bevy_reflect::TypePath::type_path
#![cfg(not(target_family = "wasm"))] use async_channel::{Receiver, Sender};
use core::net::{IpAddr, Ipv4Addr};
use std::sync::RwLock;
use anyhow::Result as AnyhowResult;
use bevy_app::prelude::*; use bevy_app::prelude::*;
use bevy_derive::{Deref, DerefMut}; use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{ use bevy_ecs::{
entity::Entity, entity::Entity,
system::{Commands, In, IntoSystem, Res, Resource, System, SystemId}, system::{Commands, In, IntoSystem, Resource, System, SystemId},
world::World, world::World,
}; };
use bevy_reflect::Reflect;
use bevy_tasks::IoTaskPool;
use bevy_utils::{prelude::default, HashMap}; use bevy_utils::{prelude::default, HashMap};
use http_body_util::{BodyExt as _, Full};
use hyper::{
body::{Bytes, Incoming},
server::conn::http1,
service, Request, Response,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use smol::{ use std::sync::RwLock;
channel::{self, Receiver, Sender},
Async,
};
use smol_hyper::rt::{FuturesIo, SmolTimer};
use std::net::{TcpListener, TcpStream};
pub mod builtin_methods; pub mod builtin_methods;
pub mod http;
/// The default port that Bevy will listen on.
///
/// This value was chosen randomly.
pub const DEFAULT_PORT: u16 = 15702;
/// The default host address that Bevy will use for its server.
pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const CHANNEL_SIZE: usize = 16; const CHANNEL_SIZE: usize = 16;
/// Add this plugin to your [`App`] to allow remote connections to inspect and modify entities. /// Add this plugin to your [`App`] to allow remote connections to inspect and modify entities.
/// ///
/// This the main plugin for `bevy_remote`. See the [crate-level documentation] for details on /// This the main plugin for `bevy_remote`. See the [crate-level documentation] for details on
/// the protocol and its default methods. /// the available protocols and its default methods.
///
/// The defaults are:
/// - [`DEFAULT_ADDR`] : 127.0.0.1.
/// - [`DEFAULT_PORT`] : 15702.
/// ///
/// [crate-level documentation]: crate /// [crate-level documentation]: crate
pub struct RemotePlugin { pub struct RemotePlugin {
/// The address that Bevy will use.
address: IpAddr,
/// The port that Bevy will listen on.
port: u16,
/// The verbs that the server will recognize and respond to. /// The verbs that the server will recognize and respond to.
methods: RwLock< methods: RwLock<
Vec<( Vec<(
@ -319,26 +283,10 @@ impl RemotePlugin {
/// any associated methods. /// any associated methods.
fn empty() -> Self { fn empty() -> Self {
Self { Self {
address: DEFAULT_ADDR,
port: DEFAULT_PORT,
methods: RwLock::new(vec![]), methods: RwLock::new(vec![]),
} }
} }
/// Set the IP address that the server will use.
#[must_use]
pub fn with_address(mut self, address: impl Into<IpAddr>) -> Self {
self.address = address.into();
self
}
/// Set the remote port that the server will listen on.
#[must_use]
pub fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
/// Add a remote method to the plugin using the given `name` and `handler`. /// Add a remote method to the plugin using the given `name` and `handler`.
#[must_use] #[must_use]
pub fn with_method<M>( pub fn with_method<M>(
@ -403,28 +351,12 @@ impl Plugin for RemotePlugin {
); );
} }
app.insert_resource(HostAddress(self.address)) app.insert_resource(remote_methods)
.insert_resource(HostPort(self.port)) .add_systems(PreStartup, setup_mailbox_channel)
.insert_resource(remote_methods)
.add_systems(Startup, start_server)
.add_systems(Update, process_remote_requests); .add_systems(Update, process_remote_requests);
} }
} }
/// A resource containing the IP address that Bevy will host on.
///
/// Currently, changing this while the application is running has no effect; this merely
/// reflects the IP address that is set during the setup of the [`RemotePlugin`].
#[derive(Debug, Resource)]
pub struct HostAddress(pub IpAddr);
/// A resource containing the port number that Bevy will listen on.
///
/// Currently, changing this while the application is running has no effect; this merely
/// reflects the host that is set during the setup of the [`RemotePlugin`].
#[derive(Debug, Resource, Reflect)]
pub struct HostPort(pub u16);
/// The type of a function that implements a remote method (`bevy/get`, `bevy/query`, etc.) /// The type of a function that implements a remote method (`bevy/get`, `bevy/query`, etc.)
/// ///
/// The first parameter is the JSON value of the `params`. Typically, an /// The first parameter is the JSON value of the `params`. Typically, an
@ -658,7 +590,7 @@ pub enum BrpBatch {
/// A message from the Bevy Remote Protocol server thread to the main world. /// A message from the Bevy Remote Protocol server thread to the main world.
/// ///
/// This is placed in the [`BrpMailbox`]. /// This is placed in the [`BrpReceiver`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BrpMessage { pub struct BrpMessage {
/// The request method. /// The request method.
@ -673,41 +605,41 @@ pub struct BrpMessage {
pub sender: Sender<BrpResult>, pub sender: Sender<BrpResult>,
} }
/// A resource holding the matching sender for the [`BrpReceiver`]'s receiver.
#[derive(Debug, Resource, Deref, DerefMut)]
pub struct BrpSender(Sender<BrpMessage>);
/// A resource that receives messages sent by Bevy Remote Protocol clients. /// A resource that receives messages sent by Bevy Remote Protocol clients.
/// ///
/// Every frame, the `process_remote_requests` system drains this mailbox and /// Every frame, the `process_remote_requests` system drains this mailbox and
/// processes the messages within. /// processes the messages within.
#[derive(Debug, Resource, Deref, DerefMut)] #[derive(Debug, Resource, Deref, DerefMut)]
pub struct BrpMailbox(Receiver<BrpMessage>); pub struct BrpReceiver(Receiver<BrpMessage>);
/// A system that starts up the Bevy Remote Protocol server. fn setup_mailbox_channel(mut commands: Commands) {
fn start_server(mut commands: Commands, address: Res<HostAddress>, remote_port: Res<HostPort>) {
// Create the channel and the mailbox. // Create the channel and the mailbox.
let (request_sender, request_receiver) = channel::bounded(CHANNEL_SIZE); let (request_sender, request_receiver) = async_channel::bounded(CHANNEL_SIZE);
commands.insert_resource(BrpMailbox(request_receiver)); commands.insert_resource(BrpSender(request_sender));
commands.insert_resource(BrpReceiver(request_receiver));
IoTaskPool::get()
.spawn(server_main(address.0, remote_port.0, request_sender))
.detach();
} }
/// A system that receives requests placed in the [`BrpMailbox`] and processes /// A system that receives requests placed in the [`BrpReceiver`] and processes
/// them, using the [`RemoteMethods`] resource to map each request to its handler. /// them, using the [`RemoteMethods`] resource to map each request to its handler.
/// ///
/// This needs exclusive access to the [`World`] because clients can manipulate /// This needs exclusive access to the [`World`] because clients can manipulate
/// anything in the ECS. /// anything in the ECS.
fn process_remote_requests(world: &mut World) { fn process_remote_requests(world: &mut World) {
if !world.contains_resource::<BrpMailbox>() { if !world.contains_resource::<BrpReceiver>() {
return; return;
} }
while let Ok(message) = world.resource_mut::<BrpMailbox>().try_recv() { while let Ok(message) = world.resource_mut::<BrpReceiver>().try_recv() {
// Fetch the handler for the method. If there's no such handler // Fetch the handler for the method. If there's no such handler
// registered, return an error. // registered, return an error.
let methods = world.resource::<RemoteMethods>(); let methods = world.resource::<RemoteMethods>();
let Some(handler) = methods.0.get(&message.method) else { let Some(handler) = methods.0.get(&message.method) else {
let _ = message.sender.send_blocking(Err(BrpError { let _ = message.sender.force_send(Err(BrpError {
code: error_codes::METHOD_NOT_FOUND, code: error_codes::METHOD_NOT_FOUND,
message: format!("Method `{}` not found", message.method), message: format!("Method `{}` not found", message.method),
data: None, data: None,
@ -719,7 +651,7 @@ fn process_remote_requests(world: &mut World) {
let result = match world.run_system_with_input(*handler, message.params) { let result = match world.run_system_with_input(*handler, message.params) {
Ok(result) => result, Ok(result) => result,
Err(error) => { Err(error) => {
let _ = message.sender.send_blocking(Err(BrpError { let _ = message.sender.force_send(Err(BrpError {
code: error_codes::INTERNAL_ERROR, code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"), message: format!("Failed to run method handler: {error}"),
data: None, data: None,
@ -728,139 +660,6 @@ fn process_remote_requests(world: &mut World) {
} }
}; };
let _ = message.sender.send_blocking(result); let _ = message.sender.force_send(result);
} }
} }
/// The Bevy Remote Protocol server main loop.
async fn server_main(
address: IpAddr,
port: u16,
request_sender: Sender<BrpMessage>,
) -> AnyhowResult<()> {
listen(
Async::<TcpListener>::bind((address, port))?,
&request_sender,
)
.await
}
async fn listen(
listener: Async<TcpListener>,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<()> {
loop {
let (client, _) = listener.accept().await?;
let request_sender = request_sender.clone();
IoTaskPool::get()
.spawn(async move {
let _ = handle_client(client, request_sender).await;
})
.detach();
}
}
async fn handle_client(
client: Async<TcpStream>,
request_sender: Sender<BrpMessage>,
) -> AnyhowResult<()> {
http1::Builder::new()
.timer(SmolTimer::new())
.serve_connection(
FuturesIo::new(client),
service::service_fn(|request| process_request_batch(request, &request_sender)),
)
.await?;
Ok(())
}
/// A helper function for the Bevy Remote Protocol server that handles a batch
/// of requests coming from a client.
async fn process_request_batch(
request: Request<Incoming>,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<Response<Full<Bytes>>> {
let batch_bytes = request.into_body().collect().await?.to_bytes();
let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
let serialized = match batch {
Ok(BrpBatch::Single(request)) => {
serde_json::to_string(&process_single_request(request, request_sender).await?)?
}
Ok(BrpBatch::Batch(requests)) => {
let mut responses = Vec::new();
for request in requests {
responses.push(process_single_request(request, request_sender).await?);
}
serde_json::to_string(&responses)?
}
Err(err) => {
let err = BrpResponse::new(
None,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: err.to_string(),
data: None,
}),
);
serde_json::to_string(&err)?
}
};
Ok(Response::new(Full::new(Bytes::from(
serialized.as_bytes().to_owned(),
))))
}
/// A helper function for the Bevy Remote Protocol server that processes a single
/// request coming from a client.
async fn process_single_request(
request: Value,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<BrpResponse> {
// Reach in and get the request ID early so that we can report it even when parsing fails.
let id = request.as_object().and_then(|map| map.get("id")).cloned();
let request: BrpRequest = match serde_json::from_value(request) {
Ok(v) => v,
Err(err) => {
return Ok(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: err.to_string(),
data: None,
}),
));
}
};
if request.jsonrpc != "2.0" {
return Ok(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
data: None,
}),
));
}
let (result_sender, result_receiver) = channel::bounded(1);
let _ = request_sender
.send(BrpMessage {
method: request.method,
params: request.params,
sender: result_sender,
})
.await;
let result = result_receiver.recv().await?;
Ok(BrpResponse::new(request.id, result))
}

View file

@ -5,7 +5,9 @@ use anyhow::Result as AnyhowResult;
use argh::FromArgs; use argh::FromArgs;
use bevy::remote::{ use bevy::remote::{
builtin_methods::{BrpQuery, BrpQueryFilter, BrpQueryParams, BRP_QUERY_METHOD}, builtin_methods::{BrpQuery, BrpQueryFilter, BrpQueryParams, BRP_QUERY_METHOD},
BrpRequest, DEFAULT_ADDR, DEFAULT_PORT, http::DEFAULT_ADDR,
http::DEFAULT_PORT,
BrpRequest,
}; };
/// Struct containing the command-line arguments that can be passed to this example. /// Struct containing the command-line arguments that can be passed to this example.

View file

@ -1,12 +1,16 @@
//! A Bevy app that you can connect to with the BRP and edit. //! A Bevy app that you can connect to with the BRP and edit.
use bevy::{prelude::*, remote::RemotePlugin}; use bevy::{
prelude::*,
remote::{http::RemoteHttpPlugin, RemotePlugin},
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
fn main() { fn main() {
App::new() App::new()
.add_plugins(DefaultPlugins) .add_plugins(DefaultPlugins)
.add_plugins(RemotePlugin::default()) .add_plugins(RemotePlugin::default())
.add_plugins(RemoteHttpPlugin::default())
.add_systems(Startup, setup) .add_systems(Startup, setup)
.register_type::<Cube>() .register_type::<Cube>()
.run(); .run();