Allow using async_io::block_on in bevy_tasks (#9626)

# Objective

Fixes #9625

## Solution

Adds `async-io` as an optional dependency of `bevy_tasks`. When enabled,
this causes calls to `futures_lite::future::block_on` to be replaced
with calls to `async_io::block_on`.

---

## Changelog

- Added a new `async-io` feature to `bevy_tasks`. When enabled, this
causes `bevy_tasks` to use `async-io`'s implemention of `block_on`
instead of `futures-lite`'s implementation. You should enable this if
you use `async-io` in your application.
This commit is contained in:
Pixelstorm 2023-09-25 20:59:50 +01:00 committed by GitHub
parent 12032cd296
commit 503b861e3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 21 additions and 8 deletions

View file

@ -199,6 +199,9 @@ serialize = ["bevy_internal/serialize"]
# Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread. # Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.
multi-threaded = ["bevy_internal/multi-threaded"] multi-threaded = ["bevy_internal/multi-threaded"]
# Use async-io's implementation of block_on instead of futures-lite's implementation. This is preferred if your application uses async-io.
async-io = ["bevy_internal/async-io"]
# Wayland display server support # Wayland display server support
wayland = ["bevy_internal/wayland"] wayland = ["bevy_internal/wayland"]

View file

@ -166,7 +166,7 @@ impl AssetProcessor {
let processor = _processor.clone(); let processor = _processor.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
processor.process_assets(); processor.process_assets();
futures_lite::future::block_on(processor.listen_for_source_change_events()); bevy_tasks::block_on(processor.listen_for_source_change_events());
}); });
} }
} }
@ -190,7 +190,7 @@ impl AssetProcessor {
}); });
// This must happen _after_ the scope resolves or it will happen "too early" // This must happen _after_ the scope resolves or it will happen "too early"
// Don't move this into the async scope above! process_assets is a blocking/sync function this is fine // Don't move this into the async scope above! process_assets is a blocking/sync function this is fine
futures_lite::future::block_on(self.finish_processing_assets()); bevy_tasks::block_on(self.finish_processing_assets());
let end_time = std::time::Instant::now(); let end_time = std::time::Instant::now();
debug!("Processing finished in {:?}", end_time - start_time); debug!("Processing finished in {:?}", end_time - start_time);
} }

View file

@ -63,6 +63,7 @@ shader_format_spirv = ["bevy_render/shader_format_spirv"]
serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene?/serialize"] serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene?/serialize"]
multi-threaded = ["bevy_asset/multi-threaded", "bevy_ecs/multi-threaded", "bevy_tasks/multi-threaded"] multi-threaded = ["bevy_asset/multi-threaded", "bevy_ecs/multi-threaded", "bevy_tasks/multi-threaded"]
async-io = ["bevy_tasks/async-io"]
# Display server protocol support (X11 is enabled by default) # Display server protocol support (X11 is enabled by default)
wayland = ["bevy_winit/wayland"] wayland = ["bevy_winit/wayland"]

View file

@ -15,6 +15,7 @@ multi-threaded = []
futures-lite = "1.4.0" futures-lite = "1.4.0"
async-executor = "1.3.0" async-executor = "1.3.0"
async-channel = "1.4.2" async-channel = "1.4.2"
async-io = { version = "1.13.0", optional = true }
async-task = "4.2.0" async-task = "4.2.0"
concurrent-queue = "2.0.0" concurrent-queue = "2.0.0"

View file

@ -28,6 +28,11 @@ mod thread_executor;
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker};
#[cfg(feature = "async-io")]
pub use async_io::block_on;
#[cfg(not(feature = "async-io"))]
pub use futures_lite::future::block_on;
mod iter; mod iter;
pub use iter::ParallelIterator; pub use iter::ParallelIterator;
@ -35,6 +40,7 @@ pub use iter::ParallelIterator;
pub mod prelude { pub mod prelude {
#[doc(hidden)] #[doc(hidden)]
pub use crate::{ pub use crate::{
block_on,
iter::ParallelIterator, iter::ParallelIterator,
slice::{ParallelSlice, ParallelSliceMut}, slice::{ParallelSlice, ParallelSliceMut},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool},

View file

@ -9,9 +9,10 @@ use std::{
use async_task::FallibleTask; use async_task::FallibleTask;
use concurrent_queue::ConcurrentQueue; use concurrent_queue::ConcurrentQueue;
use futures_lite::{future, FutureExt}; use futures_lite::FutureExt;
use crate::{ use crate::{
block_on,
thread_executor::{ThreadExecutor, ThreadExecutorTicker}, thread_executor::{ThreadExecutor, ThreadExecutorTicker},
Task, Task,
}; };
@ -176,7 +177,7 @@ impl TaskPool {
local_executor.tick().await; local_executor.tick().await;
} }
}; };
future::block_on(ex.run(tick_forever.or(shutdown_rx.recv()))) block_on(ex.run(tick_forever.or(shutdown_rx.recv())))
}); });
if let Ok(value) = res { if let Ok(value) = res {
// Use unwrap_err because we expect a Closed error // Use unwrap_err because we expect a Closed error
@ -379,7 +380,7 @@ impl TaskPool {
if spawned.is_empty() { if spawned.is_empty() {
Vec::new() Vec::new()
} else { } else {
future::block_on(async move { block_on(async move {
let get_results = async { let get_results = async {
let mut results = Vec::with_capacity(spawned.len()); let mut results = Vec::with_capacity(spawned.len());
while let Ok(task) = spawned.pop() { while let Ok(task) = spawned.pop() {
@ -661,7 +662,7 @@ where
T: 'scope, T: 'scope,
{ {
fn drop(&mut self) { fn drop(&mut self) {
future::block_on(async { block_on(async {
while let Ok(task) = self.spawned.pop() { while let Ok(task) = self.spawned.pop() {
task.cancel().await; task.cancel().await;
} }

View file

@ -43,6 +43,7 @@ The default feature set enables most of the expected features of a game engine,
|feature name|description| |feature name|description|
|-|-| |-|-|
|accesskit_unix|Enable AccessKit on Unix backends (currently only works with experimental screen readers and forks.)| |accesskit_unix|Enable AccessKit on Unix backends (currently only works with experimental screen readers and forks.)|
|async-io|Use async-io's implementation of block_on instead of futures-lite's implementation. This is preferred if your application uses async-io.|
|basis-universal|Basis Universal compressed texture support| |basis-universal|Basis Universal compressed texture support|
|bevy_ci_testing|Enable systems that allow for automated testing on CI| |bevy_ci_testing|Enable systems that allow for automated testing on CI|
|bevy_dynamic_plugin|Plugin for dynamic loading (using [libloading](https://crates.io/crates/libloading))| |bevy_dynamic_plugin|Plugin for dynamic loading (using [libloading](https://crates.io/crates/libloading))|

View file

@ -3,7 +3,7 @@
use bevy::{ use bevy::{
prelude::*, prelude::*,
tasks::{AsyncComputeTaskPool, Task}, tasks::{block_on, AsyncComputeTaskPool, Task},
}; };
use futures_lite::future; use futures_lite::future;
use rand::Rng; use rand::Rng;
@ -88,7 +88,7 @@ fn handle_tasks(
box_material_handle: Res<BoxMaterialHandle>, box_material_handle: Res<BoxMaterialHandle>,
) { ) {
for (entity, mut task) in &mut transform_tasks { for (entity, mut task) in &mut transform_tasks {
if let Some(transform) = future::block_on(future::poll_once(&mut task.0)) { if let Some(transform) = block_on(future::poll_once(&mut task.0)) {
// Add our new PbrBundle of components to our tagged entity // Add our new PbrBundle of components to our tagged entity
commands.entity(entity).insert(PbrBundle { commands.entity(entity).insert(PbrBundle {
mesh: box_mesh_handle.clone(), mesh: box_mesh_handle.clone(),