Watching versions of bevy/get and bevy/list with HTTP SSE (#15608)

## Objective

Add a way to stream BRP requests when the data changes.

## Solution

#### BRP Side (reusable for other transports)

Add a new method handler type that returns a optional value. This
handler is run in update and if a value is returned it will be sent on
the message channel. Custom watching handlers can be added with
`RemotePlugin::with_watching_method`.

#### HTTP Side

If a request comes in with `+watch` in the method, it will respond with
`text/event-stream` rather than a single response.

## Testing

I tested with the podman HTTP client. This client has good support for
SSE's if you want to test it too.

## Parts I want some opinions on

- For separating watching methods I chose to add a `+watch` suffix to
the end kind of like `content-type` headers. A get would be
`bevy/get+watch`.
- Should watching methods send an initial response with everything or
only respond when a change happens? Currently the later is what happens.

## Future work

- The `bevy/query` method would also benefit from this but that
condition will be quite complex so I will leave that to later.

---------

Co-authored-by: Zachary Harrold <zac@harrold.com.au>
This commit is contained in:
Liam Gallagher 2024-10-09 05:21:46 +13:00 committed by GitHub
parent 21b78b5990
commit f1fbb668f9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 557 additions and 71 deletions

View file

@ -6,9 +6,11 @@ use anyhow::{anyhow, Result as AnyhowResult};
use bevy_ecs::{
component::ComponentId,
entity::Entity,
event::EventCursor,
query::QueryBuilder,
reflect::{AppTypeRegistry, ReflectComponent},
system::In,
removal_detection::RemovedComponentEntity,
system::{In, Local},
world::{EntityRef, EntityWorldMut, FilteredEntityRef, World},
};
use bevy_hierarchy::BuildChildren as _;
@ -46,6 +48,12 @@ pub const BRP_REPARENT_METHOD: &str = "bevy/reparent";
/// The method path for a `bevy/list` request.
pub const BRP_LIST_METHOD: &str = "bevy/list";
/// The method path for a `bevy/get+watch` request.
pub const BRP_GET_AND_WATCH_METHOD: &str = "bevy/get+watch";
/// The method path for a `bevy/list+watch` request.
pub const BRP_LIST_AND_WATCH_METHOD: &str = "bevy/list+watch";
/// `bevy/get`: Retrieves one or more components from the entity with the given
/// ID.
///
@ -248,9 +256,41 @@ pub enum BrpGetResponse {
Strict(HashMap<String, Value>),
}
/// A single response from a `bevy/get+watch` request.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum BrpGetWatchingResponse {
/// The non-strict response that reports errors separately without failing the entire request.
Lenient {
/// A map of successful components with their values that were added or changes in the last
/// tick.
components: HashMap<String, Value>,
/// An array of components that were been removed in the last tick.
removed: Vec<String>,
/// A map of unsuccessful components with their errors.
errors: HashMap<String, Value>,
},
/// The strict response that will fail if any components are not present or aren't
/// reflect-able.
Strict {
/// A map of successful components with their values that were added or changes in the last
/// tick.
components: HashMap<String, Value>,
/// An array of components that were been removed in the last tick.
removed: Vec<String>,
},
}
/// The response to a `bevy/list` request.
pub type BrpListResponse = Vec<String>;
/// A single response from a `bevy/list+watch` request.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct BrpListWatchingResponse {
added: Vec<String>,
removed: Vec<String>,
}
/// The response to a `bevy/query` request.
pub type BrpQueryResponse = Vec<BrpQueryRow>;
@ -301,6 +341,117 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)
let type_registry = app_type_registry.read();
let entity_ref = get_entity(world, entity)?;
let response =
reflect_components_to_response(components, strict, entity, entity_ref, &type_registry)?;
serde_json::to_value(response).map_err(BrpError::internal)
}
/// Handles a `bevy/get+watch` request coming from a client.
pub fn process_remote_get_watching_request(
In(params): In<Option<Value>>,
world: &World,
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>,
) -> BrpResult<Option<Value>> {
let BrpGetParams {
entity,
components,
strict,
} = parse_some(params)?;
let app_type_registry = world.resource::<AppTypeRegistry>();
let type_registry = app_type_registry.read();
let entity_ref = get_entity(world, entity)?;
let mut changed = Vec::new();
let mut removed = Vec::new();
let mut errors = HashMap::new();
'component_loop: for component_path in components {
let Ok(type_registration) =
get_component_type_registration(&type_registry, &component_path)
else {
let err =
BrpError::component_error(format!("Unknown component type: `{component_path}`"));
if strict {
return Err(err);
}
errors.insert(
component_path,
serde_json::to_value(err).map_err(BrpError::internal)?,
);
continue;
};
let Some(component_id) = world.components().get_id(type_registration.type_id()) else {
let err = BrpError::component_error(format!("Unknown component: `{component_path}`"));
if strict {
return Err(err);
}
errors.insert(
component_path,
serde_json::to_value(err).map_err(BrpError::internal)?,
);
continue;
};
if let Some(ticks) = entity_ref.get_change_ticks_by_id(component_id) {
if ticks.is_changed(world.last_change_tick(), world.read_change_tick()) {
changed.push(component_path);
continue;
}
};
let Some(events) = world.removed_components().get(component_id) else {
continue;
};
let cursor = removal_cursors
.entry(component_id)
.or_insert_with(|| events.get_cursor());
for event in cursor.read(events) {
if Entity::from(event.clone()) == entity {
removed.push(component_path);
continue 'component_loop;
}
}
}
if changed.is_empty() && removed.is_empty() {
return Ok(None);
}
let response =
reflect_components_to_response(changed, strict, entity, entity_ref, &type_registry)?;
let response = match response {
BrpGetResponse::Lenient {
components,
errors: mut errs,
} => BrpGetWatchingResponse::Lenient {
components,
removed,
errors: {
errs.extend(errors);
errs
},
},
BrpGetResponse::Strict(components) => BrpGetWatchingResponse::Strict {
components,
removed,
},
};
Ok(Some(
serde_json::to_value(response).map_err(BrpError::internal)?,
))
}
/// Reflect a list of components on an entity into a [`BrpGetResponse`].
fn reflect_components_to_response(
components: Vec<String>,
strict: bool,
entity: Entity,
entity_ref: EntityRef,
type_registry: &TypeRegistry,
) -> BrpResult<BrpGetResponse> {
let mut response = if strict {
BrpGetResponse::Strict(Default::default())
} else {
@ -311,7 +462,7 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)
};
for component_path in components {
match handle_get_component(&component_path, entity, entity_ref, &type_registry) {
match reflect_component(&component_path, entity, entity_ref, type_registry) {
Ok(serialized_object) => match response {
BrpGetResponse::Strict(ref mut components)
| BrpGetResponse::Lenient {
@ -330,16 +481,16 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)
}
}
serde_json::to_value(response).map_err(BrpError::internal)
Ok(response)
}
/// Handle a single component for [`process_remote_get_request`].
fn handle_get_component(
/// Reflect a single component on an entity with the given component path.
fn reflect_component(
component_path: &str,
entity: Entity,
entity_ref: EntityRef,
type_registry: &TypeRegistry,
) -> Result<Map<String, Value>, BrpError> {
) -> BrpResult<Map<String, Value>> {
let reflect_component =
get_reflect_component(type_registry, component_path).map_err(BrpError::component_error)?;
@ -596,6 +747,52 @@ pub fn process_remote_list_request(In(params): In<Option<Value>>, world: &World)
serde_json::to_value(response).map_err(BrpError::internal)
}
/// Handles a `bevy/list` request (list all components) coming from a client.
pub fn process_remote_list_watching_request(
In(params): In<Option<Value>>,
world: &World,
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>,
) -> BrpResult<Option<Value>> {
let BrpListParams { entity } = parse_some(params)?;
let entity_ref = get_entity(world, entity)?;
let mut response = BrpListWatchingResponse::default();
for component_id in entity_ref.archetype().components() {
let ticks = entity_ref
.get_change_ticks_by_id(component_id)
.ok_or(BrpError::internal("Failed to get ticks"))?;
if ticks.is_added(world.last_change_tick(), world.read_change_tick()) {
let Some(component_info) = world.components().get_info(component_id) else {
continue;
};
response.added.push(component_info.name().to_owned());
}
}
for (component_id, events) in world.removed_components().iter() {
let cursor = removal_cursors
.entry(*component_id)
.or_insert_with(|| events.get_cursor());
for event in cursor.read(events) {
if Entity::from(event.clone()) == entity {
let Some(component_info) = world.components().get_info(*component_id) else {
continue;
};
response.removed.push(component_info.name().to_owned());
}
}
}
if response.added.is_empty() && response.removed.is_empty() {
Ok(None)
} else {
Ok(Some(
serde_json::to_value(response).map_err(BrpError::internal)?,
))
}
}
/// Immutably retrieves an entity from the [`World`], returning an error if the
/// entity isn't present.
fn get_entity(world: &World, entity: Entity) -> Result<EntityRef<'_>, BrpError> {

View file

@ -8,26 +8,32 @@
#![cfg(not(target_family = "wasm"))]
use crate::{error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpSender};
use crate::{
error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpResult, BrpSender,
};
use anyhow::Result as AnyhowResult;
use async_channel::Sender;
use async_channel::{Receiver, Sender};
use async_io::Async;
use bevy_app::{App, Plugin, Startup};
use bevy_ecs::system::{Res, Resource};
use bevy_tasks::IoTaskPool;
use bevy_tasks::{futures_lite::StreamExt, IoTaskPool};
use core::net::{IpAddr, Ipv4Addr};
use core::{
convert::Infallible,
pin::Pin,
task::{Context, Poll},
};
use http_body_util::{BodyExt as _, Full};
use hyper::header::{HeaderName, HeaderValue};
use hyper::{
body::{Bytes, Incoming},
body::{Body, Bytes, Frame, Incoming},
server::conn::http1,
service, Request, Response,
};
use serde_json::Value;
use smol_hyper::rt::{FuturesIo, SmolTimer};
use std::collections::HashMap;
use std::net::TcpListener;
use std::net::TcpStream;
use std::net::{TcpListener, TcpStream};
/// The default port that Bevy will listen on.
///
@ -259,22 +265,41 @@ async fn process_request_batch(
request: Request<Incoming>,
request_sender: &Sender<BrpMessage>,
headers: &Headers,
) -> AnyhowResult<Response<Full<Bytes>>> {
) -> AnyhowResult<Response<BrpHttpBody>> {
let batch_bytes = request.into_body().collect().await?.to_bytes();
let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
let serialized = match batch {
let result = match batch {
Ok(BrpBatch::Single(request)) => {
serde_json::to_string(&process_single_request(request, request_sender).await?)?
let response = process_single_request(request, request_sender).await?;
match response {
BrpHttpResponse::Complete(res) => {
BrpHttpResponse::Complete(serde_json::to_string(&res)?)
}
BrpHttpResponse::Stream(stream) => BrpHttpResponse::Stream(stream),
}
}
Ok(BrpBatch::Batch(requests)) => {
let mut responses = Vec::new();
for request in requests {
responses.push(process_single_request(request, request_sender).await?);
let response = process_single_request(request, request_sender).await?;
match response {
BrpHttpResponse::Complete(res) => responses.push(res),
BrpHttpResponse::Stream(BrpStream { id, .. }) => {
responses.push(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: "Streaming can not be used in batch requests".to_string(),
data: None,
}),
));
}
}
}
serde_json::to_string(&responses)?
BrpHttpResponse::Complete(serde_json::to_string(&responses)?)
}
Err(err) => {
let err = BrpResponse::new(
@ -286,15 +311,30 @@ async fn process_request_batch(
}),
);
serde_json::to_string(&err)?
BrpHttpResponse::Complete(serde_json::to_string(&err)?)
}
};
let mut response = Response::new(Full::new(Bytes::from(serialized.as_bytes().to_owned())));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
let mut response = match result {
BrpHttpResponse::Complete(serialized) => {
let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from(
serialized.as_bytes().to_owned(),
))));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
response
}
BrpHttpResponse::Stream(stream) => {
let mut response = Response::new(BrpHttpBody::Stream(stream));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
response
}
};
for (key, value) in &headers.headers {
response.headers_mut().insert(key, value.clone());
}
@ -306,36 +346,38 @@ async fn process_request_batch(
async fn process_single_request(
request: Value,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<BrpResponse> {
) -> AnyhowResult<BrpHttpResponse<BrpResponse, BrpStream>> {
// Reach in and get the request ID early so that we can report it even when parsing fails.
let id = request.as_object().and_then(|map| map.get("id")).cloned();
let request: BrpRequest = match serde_json::from_value(request) {
Ok(v) => v,
Err(err) => {
return Ok(BrpResponse::new(
return Ok(BrpHttpResponse::Complete(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: err.to_string(),
data: None,
}),
));
)));
}
};
if request.jsonrpc != "2.0" {
return Ok(BrpResponse::new(
return Ok(BrpHttpResponse::Complete(BrpResponse::new(
id,
Err(BrpError {
code: error_codes::INVALID_REQUEST,
message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
data: None,
}),
));
)));
}
let (result_sender, result_receiver) = async_channel::bounded(1);
let watch = request.method.contains("+watch");
let size = if watch { 8 } else { 1 };
let (result_sender, result_receiver) = async_channel::bounded(size);
let _ = request_sender
.send(BrpMessage {
@ -345,6 +387,74 @@ async fn process_single_request(
})
.await;
let result = result_receiver.recv().await?;
Ok(BrpResponse::new(request.id, result))
if watch {
Ok(BrpHttpResponse::Stream(BrpStream {
id: request.id,
rx: Box::pin(result_receiver),
}))
} else {
let result = result_receiver.recv().await?;
Ok(BrpHttpResponse::Complete(BrpResponse::new(
request.id, result,
)))
}
}
struct BrpStream {
id: Option<Value>,
rx: Pin<Box<Receiver<BrpResult>>>,
}
impl Body for BrpStream {
type Data = Bytes;
type Error = Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.as_mut().rx.poll_next(cx) {
Poll::Ready(result) => match result {
Some(result) => {
let response = BrpResponse::new(self.id.clone(), result);
let serialized = serde_json::to_string(&response).unwrap();
let bytes =
Bytes::from(format!("data: {serialized}\n\n").as_bytes().to_owned());
let frame = Frame::data(bytes);
Poll::Ready(Some(Ok(frame)))
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
fn is_end_stream(&self) -> bool {
dbg!(self.rx.is_closed())
}
}
enum BrpHttpResponse<C, S> {
Complete(C),
Stream(S),
}
enum BrpHttpBody {
Complete(Full<Bytes>),
Stream(BrpStream),
}
impl Body for BrpHttpBody {
type Data = Bytes;
type Error = Infallible;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut *self.get_mut() {
BrpHttpBody::Complete(body) => Body::poll_frame(Pin::new(body), cx),
BrpHttpBody::Stream(body) => Body::poll_frame(Pin::new(body), cx),
}
}
}

View file

@ -212,6 +212,51 @@
//!
//! `result`: An array of fully-qualified type names of components.
//!
//! ### bevy/get+watch
//!
//! Watch the values of one or more components from an entity.
//!
//! `params`:
//! - `entity`: The ID of the entity whose components will be fetched.
//! - `components`: An array of [fully-qualified type names] of components to fetch.
//! - `strict` (optional): A flag to enable strict mode which will fail if any one of the
//! components is not present or can not be reflected. Defaults to false.
//!
//! If `strict` is false:
//!
//! `result`:
//! - `components`: A map of components added or changed in the last tick associating each type
//! name to its value on the requested entity.
//! - `removed`: An array of fully-qualified type names of components removed from the entity
//! in the last tick.
//! - `errors`: A map associating each type name with an error if it was not on the entity
//! or could not be reflected.
//!
//! If `strict` is true:
//!
//! `result`:
//! - `components`: A map of components added or changed in the last tick associating each type
//! name to its value on the requested entity.
//! - `removed`: An array of fully-qualified type names of components removed from the entity
//! in the last tick.
//!
//! ### bevy/list+watch
//!
//! Watch all components present on an entity.
//!
//! When `params` is not provided, this lists all registered components. If `params` is provided,
//! this lists only those components present on the provided entity.
//!
//! `params`:
//! - `entity`: The ID of the entity whose components will be listed.
//!
//! `result`:
//! - `added`: An array of fully-qualified type names of components added to the entity in the
//! last tick.
//! - `removed`: An array of fully-qualified type names of components removed from the entity
//! in the last tick.
//!
//!
//! ## Custom methods
//!
//! In addition to the provided methods, the Bevy Remote Protocol can be extended to include custom
@ -260,7 +305,8 @@ use bevy_app::prelude::*;
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{
entity::Entity,
system::{Commands, In, IntoSystem, Resource, System, SystemId},
schedule::IntoSystemConfigs,
system::{Commands, In, IntoSystem, ResMut, Resource, System, SystemId},
world::World,
};
use bevy_utils::{prelude::default, HashMap};
@ -281,12 +327,7 @@ const CHANNEL_SIZE: usize = 16;
/// [crate-level documentation]: crate
pub struct RemotePlugin {
/// The verbs that the server will recognize and respond to.
methods: RwLock<
Vec<(
String,
Box<dyn System<In = In<Option<Value>>, Out = BrpResult>>,
)>,
>,
methods: RwLock<Vec<(String, RemoteMethodHandler)>>,
}
impl RemotePlugin {
@ -305,10 +346,24 @@ impl RemotePlugin {
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult, M>,
) -> Self {
self.methods
.get_mut()
.unwrap()
.push((name.into(), Box::new(IntoSystem::into_system(handler))));
self.methods.get_mut().unwrap().push((
name.into(),
RemoteMethodHandler::Instant(Box::new(IntoSystem::into_system(handler))),
));
self
}
/// Add a remote method with a watching handler to the plugin using the given `name`.
#[must_use]
pub fn with_watching_method<M>(
mut self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult<Option<Value>>, M>,
) -> Self {
self.methods.get_mut().unwrap().push((
name.into(),
RemoteMethodHandler::Watching(Box::new(IntoSystem::into_system(handler))),
));
self
}
}
@ -348,40 +403,93 @@ impl Default for RemotePlugin {
builtin_methods::BRP_LIST_METHOD,
builtin_methods::process_remote_list_request,
)
.with_watching_method(
builtin_methods::BRP_GET_AND_WATCH_METHOD,
builtin_methods::process_remote_get_watching_request,
)
.with_watching_method(
builtin_methods::BRP_LIST_AND_WATCH_METHOD,
builtin_methods::process_remote_list_watching_request,
)
}
}
impl Plugin for RemotePlugin {
fn build(&self, app: &mut App) {
let mut remote_methods = RemoteMethods::new();
let plugin_methods = &mut *self.methods.write().unwrap();
for (name, system) in plugin_methods.drain(..) {
for (name, handler) in plugin_methods.drain(..) {
remote_methods.insert(
name,
app.main_mut().world_mut().register_boxed_system(system),
match handler {
RemoteMethodHandler::Instant(system) => RemoteMethodSystemId::Instant(
app.main_mut().world_mut().register_boxed_system(system),
),
RemoteMethodHandler::Watching(system) => RemoteMethodSystemId::Watching(
app.main_mut().world_mut().register_boxed_system(system),
),
},
);
}
app.insert_resource(remote_methods)
.init_resource::<RemoteWatchingRequests>()
.add_systems(PreStartup, setup_mailbox_channel)
.add_systems(Update, process_remote_requests);
.add_systems(
Update,
(
process_remote_requests,
process_ongoing_watching_requests,
remove_closed_watching_requests,
)
.chain(),
);
}
}
/// The type of a function that implements a remote method (`bevy/get`, `bevy/query`, etc.)
/// A type to hold the allowed types of systems to be used as method handlers.
#[derive(Debug)]
pub enum RemoteMethodHandler {
/// A handler that only runs once and returns one response.
Instant(Box<dyn System<In = In<Option<Value>>, Out = BrpResult>>),
/// A handler that watches for changes and response when a change is detected.
Watching(Box<dyn System<In = In<Option<Value>>, Out = BrpResult<Option<Value>>>>),
}
/// The [`SystemId`] of a function that implements a remote instant method (`bevy/get`, `bevy/query`, etc.)
///
/// The first parameter is the JSON value of the `params`. Typically, an
/// implementation will deserialize these as the first thing they do.
///
/// The returned JSON value will be returned as the response. Bevy will
/// automatically populate the `id` field before sending.
pub type RemoteMethod = SystemId<In<Option<Value>>, BrpResult>;
pub type RemoteInstantMethodSystemId = SystemId<In<Option<Value>>, BrpResult>;
/// The [`SystemId`] of a function that implements a remote watching method (`bevy/get+watch`, `bevy/list+watch`, etc.)
///
/// The first parameter is the JSON value of the `params`. Typically, an
/// implementation will deserialize these as the first thing they do.
///
/// The optional returned JSON value will be sent as a response. If no
/// changes were detected this should be [`None`]. Re-running of this
/// handler is done in the [`RemotePlugin`].
pub type RemoteWatchingMethodSystemId = SystemId<In<Option<Value>>, BrpResult<Option<Value>>>;
/// The [`SystemId`] of a function that can be used as a remote method.
#[derive(Debug, Clone, Copy)]
pub enum RemoteMethodSystemId {
/// A handler that only runs once and returns one response.
Instant(RemoteInstantMethodSystemId),
/// A handler that watches for changes and response when a change is detected.
Watching(RemoteWatchingMethodSystemId),
}
/// Holds all implementations of methods known to the server.
///
/// Custom methods can be added to this list using [`RemoteMethods::insert`].
#[derive(Debug, Resource, Default)]
pub struct RemoteMethods(HashMap<String, RemoteMethod>);
pub struct RemoteMethods(HashMap<String, RemoteMethodSystemId>);
impl RemoteMethods {
/// Creates a new [`RemoteMethods`] resource with no methods registered in it.
@ -395,17 +503,21 @@ impl RemoteMethods {
pub fn insert(
&mut self,
method_name: impl Into<String>,
handler: RemoteMethod,
) -> Option<RemoteMethod> {
handler: RemoteMethodSystemId,
) -> Option<RemoteMethodSystemId> {
self.0.insert(method_name.into(), handler)
}
/// Retrieves a handler by method name.
pub fn get(&self, method_name: &str) -> Option<&RemoteMethod> {
self.0.get(method_name)
/// Get a [`RemoteMethodSystemId`] with its method name.
pub fn get(&self, method: &str) -> Option<&RemoteMethodSystemId> {
self.0.get(method)
}
}
/// Holds the [`BrpMessage`]'s of all ongoing watching requests along with their handlers.
#[derive(Debug, Resource, Default)]
pub struct RemoteWatchingRequests(Vec<(BrpMessage, RemoteWatchingMethodSystemId)>);
/// A single request from a Bevy Remote Protocol client to the server,
/// serialized in JSON.
///
@ -590,7 +702,7 @@ pub mod error_codes {
}
/// The result of a request.
pub type BrpResult = Result<Value, BrpError>;
pub type BrpResult<T = Value> = Result<T, BrpError>;
/// The requests may occur on their own or in batches.
/// Actual parsing is deferred for the sake of proper
@ -652,30 +764,83 @@ fn process_remote_requests(world: &mut World) {
while let Ok(message) = world.resource_mut::<BrpReceiver>().try_recv() {
// Fetch the handler for the method. If there's no such handler
// registered, return an error.
let methods = world.resource::<RemoteMethods>();
let Some(handler) = methods.0.get(&message.method) else {
let Some(&handler) = world.resource::<RemoteMethods>().get(&message.method) else {
let _ = message.sender.force_send(Err(BrpError {
code: error_codes::METHOD_NOT_FOUND,
message: format!("Method `{}` not found", message.method),
data: None,
}));
continue;
return;
};
// Execute the handler, and send the result back to the client.
let result = match world.run_system_with_input(*handler, message.params) {
Ok(result) => result,
Err(error) => {
let _ = message.sender.force_send(Err(BrpError {
code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"),
data: None,
}));
continue;
match handler {
RemoteMethodSystemId::Instant(id) => {
let result = match world.run_system_with_input(id, message.params) {
Ok(result) => result,
Err(error) => {
let _ = message.sender.force_send(Err(BrpError {
code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"),
data: None,
}));
continue;
}
};
let _ = message.sender.force_send(result);
}
};
let _ = message.sender.force_send(result);
RemoteMethodSystemId::Watching(id) => {
world
.resource_mut::<RemoteWatchingRequests>()
.0
.push((message, id));
}
}
}
}
/// A system that checks all ongoing watching requests for changes that should be sent
/// and handles it if so.
fn process_ongoing_watching_requests(world: &mut World) {
world.resource_scope::<RemoteWatchingRequests, ()>(|world, requests| {
for (message, system_id) in requests.0.iter() {
let handler_result = process_single_ongoing_watching_request(world, message, system_id);
let sender_result = match handler_result {
Ok(Some(value)) => message.sender.try_send(Ok(value)),
Err(err) => message.sender.try_send(Err(err)),
Ok(None) => continue,
};
if sender_result.is_err() {
// The [`remove_closed_watching_requests`] system will clean this up.
message.sender.close();
}
}
});
}
fn process_single_ongoing_watching_request(
world: &mut World,
message: &BrpMessage,
system_id: &RemoteWatchingMethodSystemId,
) -> BrpResult<Option<Value>> {
world
.run_system_with_input(*system_id, message.params.clone())
.map_err(|error| BrpError {
code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"),
data: None,
})?
}
fn remove_closed_watching_requests(mut requests: ResMut<RemoteWatchingRequests>) {
for i in (0..requests.0.len()).rev() {
let Some((message, _)) = requests.0.get(i) else {
unreachable!()
};
if message.sender.is_closed() {
requests.0.swap_remove(i);
}
}
}

View file

@ -1,6 +1,8 @@
//! A Bevy app that you can connect to with the BRP and edit.
use bevy::math::ops::cos;
use bevy::{
input::common_conditions::input_just_pressed,
prelude::*,
remote::{http::RemoteHttpPlugin, RemotePlugin},
};
@ -12,6 +14,8 @@ fn main() {
.add_plugins(RemotePlugin::default())
.add_plugins(RemoteHttpPlugin::default())
.add_systems(Startup, setup)
.add_systems(Update, remove.run_if(input_just_pressed(KeyCode::Space)))
.add_systems(Update, move_cube)
.register_type::<Cube>()
.run();
}
@ -52,6 +56,16 @@ fn setup(
));
}
fn move_cube(mut query: Query<&mut Transform, With<Cube>>, time: Res<Time>) {
for mut transform in &mut query {
transform.translation.y = -cos(time.elapsed_seconds()) + 1.5;
}
}
fn remove(mut commands: Commands, query: Query<Entity, With<Cube>>) {
commands.entity(query.single()).remove::<Cube>();
}
#[derive(Component, Reflect, Serialize, Deserialize)]
#[reflect(Component, Serialize, Deserialize)]
struct Cube(f32);