2020-08-29 19:35:41 +00:00
|
|
|
use std::{
|
|
|
|
future::Future,
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
marker::PhantomData,
|
2020-08-29 19:35:41 +00:00
|
|
|
mem,
|
|
|
|
pin::Pin,
|
2020-09-09 20:12:50 +00:00
|
|
|
sync::Arc,
|
2020-08-29 19:35:41 +00:00
|
|
|
thread::{self, JoinHandle},
|
|
|
|
};
|
|
|
|
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
use concurrent_queue::ConcurrentQueue;
|
2022-10-24 13:46:40 +00:00
|
|
|
use futures_lite::{future, pin, FutureExt};
|
2020-09-10 19:54:24 +00:00
|
|
|
|
2020-09-22 03:23:09 +00:00
|
|
|
use crate::Task;
|
|
|
|
|
2021-12-29 17:38:13 +00:00
|
|
|
/// Used to create a [`TaskPool`]
|
2020-08-29 19:35:41 +00:00
|
|
|
#[derive(Debug, Default, Clone)]
|
2022-02-13 22:33:55 +00:00
|
|
|
#[must_use]
|
2020-08-29 19:35:41 +00:00
|
|
|
pub struct TaskPoolBuilder {
|
2022-07-12 13:06:16 +00:00
|
|
|
/// If set, we'll set up the thread pool to use at most `num_threads` threads.
|
|
|
|
/// Otherwise use the logical core count of the system
|
2020-08-29 19:35:41 +00:00
|
|
|
num_threads: Option<usize>,
|
|
|
|
/// If set, we'll use the given stack size rather than the system default
|
|
|
|
stack_size: Option<usize>,
|
|
|
|
/// Allows customizing the name of the threads - helpful for debugging. If set, threads will
|
|
|
|
/// be named <thread_name> (<thread_index>), i.e. "MyThreadPool (2)"
|
|
|
|
thread_name: Option<String>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TaskPoolBuilder {
|
2021-12-29 17:38:13 +00:00
|
|
|
/// Creates a new [`TaskPoolBuilder`] instance
|
2020-08-29 19:35:41 +00:00
|
|
|
pub fn new() -> Self {
|
|
|
|
Self::default()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Override the number of threads created for the pool. If unset, we default to the number
|
|
|
|
/// of logical cores of the system
|
|
|
|
pub fn num_threads(mut self, num_threads: usize) -> Self {
|
|
|
|
self.num_threads = Some(num_threads);
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Override the stack size of the threads created for the pool
|
|
|
|
pub fn stack_size(mut self, stack_size: usize) -> Self {
|
|
|
|
self.stack_size = Some(stack_size);
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Override the name of the threads created for the pool. If set, threads will
|
2021-12-29 17:38:13 +00:00
|
|
|
/// be named `<thread_name> (<thread_index>)`, i.e. `MyThreadPool (2)`
|
2020-08-29 19:35:41 +00:00
|
|
|
pub fn thread_name(mut self, thread_name: String) -> Self {
|
|
|
|
self.thread_name = Some(thread_name);
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
2021-12-29 17:38:13 +00:00
|
|
|
/// Creates a new [`TaskPool`] based on the current options.
|
2020-08-29 19:35:41 +00:00
|
|
|
pub fn build(self) -> TaskPool {
|
|
|
|
TaskPool::new_internal(
|
|
|
|
self.num_threads,
|
|
|
|
self.stack_size,
|
|
|
|
self.thread_name.as_deref(),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
|
|
|
|
/// the pool on threads owned by the pool.
|
Add global init and get accessors for all newtyped TaskPools (#2250)
Right now, a direct reference to the target TaskPool is required to launch tasks on the pools, despite the three newtyped pools (AsyncComputeTaskPool, ComputeTaskPool, and IoTaskPool) effectively acting as global instances. The need to pass a TaskPool reference adds notable friction to spawning subtasks within existing tasks. Possible use cases for this may include chaining tasks within the same pool like spawning separate send/receive I/O tasks after waiting on a network connection to be established, or allowing cross-pool dependent tasks like starting dependent multi-frame computations following a long I/O load.
Other task execution runtimes provide static access to spawning tasks (i.e. `tokio::spawn`), which is notably easier to use than the reference passing required by `bevy_tasks` right now.
This PR makes does the following:
* Adds `*TaskPool::init` which initializes a `OnceCell`'ed with a provided TaskPool. Failing if the pool has already been initialized.
* Adds `*TaskPool::get` which fetches the initialized global pool of the respective type or panics. This generally should not be an issue in normal Bevy use, as the pools are initialized before they are accessed.
* Updated default task pool initialization to either pull the global handles and save them as resources, or if they are already initialized, pull the a cloned global handle as the resource.
This should make it notably easier to build more complex task hierarchies for dependent tasks. It should also make writing bevy-adjacent, but not strictly bevy-only plugin crates easier, as the global pools ensure it's all running on the same threads.
One alternative considered is keeping a thread-local reference to the pool for all threads in each pool to enable the same `tokio::spawn` interface. This would spawn tasks on the same pool that a task is currently running in. However this potentially leads to potential footgun situations where long running blocking tasks run on `ComputeTaskPool`.
2022-06-09 02:43:24 +00:00
|
|
|
#[derive(Debug)]
|
2020-08-29 19:35:41 +00:00
|
|
|
pub struct TaskPool {
|
|
|
|
/// The executor for the pool
|
|
|
|
///
|
|
|
|
/// This has to be separate from TaskPoolInner because we have to create an Arc<Executor> to
|
2021-03-11 00:27:30 +00:00
|
|
|
/// pass into the worker threads, and we must create the worker threads before we can create
|
|
|
|
/// the Vec<Task<T>> contained within TaskPoolInner
|
2020-09-20 18:27:24 +00:00
|
|
|
executor: Arc<async_executor::Executor<'static>>,
|
2020-08-29 19:35:41 +00:00
|
|
|
|
|
|
|
/// Inner state of the pool
|
Add global init and get accessors for all newtyped TaskPools (#2250)
Right now, a direct reference to the target TaskPool is required to launch tasks on the pools, despite the three newtyped pools (AsyncComputeTaskPool, ComputeTaskPool, and IoTaskPool) effectively acting as global instances. The need to pass a TaskPool reference adds notable friction to spawning subtasks within existing tasks. Possible use cases for this may include chaining tasks within the same pool like spawning separate send/receive I/O tasks after waiting on a network connection to be established, or allowing cross-pool dependent tasks like starting dependent multi-frame computations following a long I/O load.
Other task execution runtimes provide static access to spawning tasks (i.e. `tokio::spawn`), which is notably easier to use than the reference passing required by `bevy_tasks` right now.
This PR makes does the following:
* Adds `*TaskPool::init` which initializes a `OnceCell`'ed with a provided TaskPool. Failing if the pool has already been initialized.
* Adds `*TaskPool::get` which fetches the initialized global pool of the respective type or panics. This generally should not be an issue in normal Bevy use, as the pools are initialized before they are accessed.
* Updated default task pool initialization to either pull the global handles and save them as resources, or if they are already initialized, pull the a cloned global handle as the resource.
This should make it notably easier to build more complex task hierarchies for dependent tasks. It should also make writing bevy-adjacent, but not strictly bevy-only plugin crates easier, as the global pools ensure it's all running on the same threads.
One alternative considered is keeping a thread-local reference to the pool for all threads in each pool to enable the same `tokio::spawn` interface. This would spawn tasks on the same pool that a task is currently running in. However this potentially leads to potential footgun situations where long running blocking tasks run on `ComputeTaskPool`.
2022-06-09 02:43:24 +00:00
|
|
|
threads: Vec<JoinHandle<()>>,
|
|
|
|
shutdown_tx: async_channel::Sender<()>,
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl TaskPool {
|
2021-01-18 21:48:28 +00:00
|
|
|
thread_local! {
|
|
|
|
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
|
|
|
|
}
|
|
|
|
|
2020-08-29 19:35:41 +00:00
|
|
|
/// Create a `TaskPool` with the default configuration.
|
|
|
|
pub fn new() -> Self {
|
|
|
|
TaskPoolBuilder::new().build()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn new_internal(
|
|
|
|
num_threads: Option<usize>,
|
|
|
|
stack_size: Option<usize>,
|
|
|
|
thread_name: Option<&str>,
|
|
|
|
) -> Self {
|
2020-09-09 20:12:50 +00:00
|
|
|
let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>();
|
|
|
|
|
|
|
|
let executor = Arc::new(async_executor::Executor::new());
|
2020-08-29 19:35:41 +00:00
|
|
|
|
2022-09-19 15:46:03 +00:00
|
|
|
let num_threads = num_threads.unwrap_or_else(crate::available_parallelism);
|
2020-08-29 19:35:41 +00:00
|
|
|
|
|
|
|
let threads = (0..num_threads)
|
|
|
|
.map(|i| {
|
|
|
|
let ex = Arc::clone(&executor);
|
2020-09-09 20:12:50 +00:00
|
|
|
let shutdown_rx = shutdown_rx.clone();
|
2020-08-29 19:35:41 +00:00
|
|
|
|
2022-06-26 21:28:00 +00:00
|
|
|
let thread_name = if let Some(thread_name) = thread_name {
|
|
|
|
format!("{} ({})", thread_name, i)
|
|
|
|
} else {
|
|
|
|
format!("TaskPool ({})", i)
|
2020-08-29 19:35:41 +00:00
|
|
|
};
|
2022-06-26 21:28:00 +00:00
|
|
|
let mut thread_builder = thread::Builder::new().name(thread_name);
|
2020-08-29 19:35:41 +00:00
|
|
|
|
|
|
|
if let Some(stack_size) = stack_size {
|
|
|
|
thread_builder = thread_builder.stack_size(stack_size);
|
|
|
|
}
|
|
|
|
|
2020-09-09 20:12:50 +00:00
|
|
|
thread_builder
|
2020-08-29 19:35:41 +00:00
|
|
|
.spawn(move || {
|
2022-10-24 13:46:40 +00:00
|
|
|
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
|
|
|
|
let tick_forever = async move {
|
|
|
|
loop {
|
|
|
|
local_executor.tick().await;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv()));
|
|
|
|
// Use unwrap_err because we expect a Closed error
|
|
|
|
future::block_on(shutdown_future).unwrap_err();
|
|
|
|
});
|
2020-08-29 19:35:41 +00:00
|
|
|
})
|
2020-12-02 19:31:16 +00:00
|
|
|
.expect("Failed to spawn thread.")
|
2020-08-29 19:35:41 +00:00
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
Self {
|
|
|
|
executor,
|
Add global init and get accessors for all newtyped TaskPools (#2250)
Right now, a direct reference to the target TaskPool is required to launch tasks on the pools, despite the three newtyped pools (AsyncComputeTaskPool, ComputeTaskPool, and IoTaskPool) effectively acting as global instances. The need to pass a TaskPool reference adds notable friction to spawning subtasks within existing tasks. Possible use cases for this may include chaining tasks within the same pool like spawning separate send/receive I/O tasks after waiting on a network connection to be established, or allowing cross-pool dependent tasks like starting dependent multi-frame computations following a long I/O load.
Other task execution runtimes provide static access to spawning tasks (i.e. `tokio::spawn`), which is notably easier to use than the reference passing required by `bevy_tasks` right now.
This PR makes does the following:
* Adds `*TaskPool::init` which initializes a `OnceCell`'ed with a provided TaskPool. Failing if the pool has already been initialized.
* Adds `*TaskPool::get` which fetches the initialized global pool of the respective type or panics. This generally should not be an issue in normal Bevy use, as the pools are initialized before they are accessed.
* Updated default task pool initialization to either pull the global handles and save them as resources, or if they are already initialized, pull the a cloned global handle as the resource.
This should make it notably easier to build more complex task hierarchies for dependent tasks. It should also make writing bevy-adjacent, but not strictly bevy-only plugin crates easier, as the global pools ensure it's all running on the same threads.
One alternative considered is keeping a thread-local reference to the pool for all threads in each pool to enable the same `tokio::spawn` interface. This would spawn tasks on the same pool that a task is currently running in. However this potentially leads to potential footgun situations where long running blocking tasks run on `ComputeTaskPool`.
2022-06-09 02:43:24 +00:00
|
|
|
threads,
|
|
|
|
shutdown_tx,
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Return the number of threads owned by the task pool
|
|
|
|
pub fn thread_num(&self) -> usize {
|
Add global init and get accessors for all newtyped TaskPools (#2250)
Right now, a direct reference to the target TaskPool is required to launch tasks on the pools, despite the three newtyped pools (AsyncComputeTaskPool, ComputeTaskPool, and IoTaskPool) effectively acting as global instances. The need to pass a TaskPool reference adds notable friction to spawning subtasks within existing tasks. Possible use cases for this may include chaining tasks within the same pool like spawning separate send/receive I/O tasks after waiting on a network connection to be established, or allowing cross-pool dependent tasks like starting dependent multi-frame computations following a long I/O load.
Other task execution runtimes provide static access to spawning tasks (i.e. `tokio::spawn`), which is notably easier to use than the reference passing required by `bevy_tasks` right now.
This PR makes does the following:
* Adds `*TaskPool::init` which initializes a `OnceCell`'ed with a provided TaskPool. Failing if the pool has already been initialized.
* Adds `*TaskPool::get` which fetches the initialized global pool of the respective type or panics. This generally should not be an issue in normal Bevy use, as the pools are initialized before they are accessed.
* Updated default task pool initialization to either pull the global handles and save them as resources, or if they are already initialized, pull the a cloned global handle as the resource.
This should make it notably easier to build more complex task hierarchies for dependent tasks. It should also make writing bevy-adjacent, but not strictly bevy-only plugin crates easier, as the global pools ensure it's all running on the same threads.
One alternative considered is keeping a thread-local reference to the pool for all threads in each pool to enable the same `tokio::spawn` interface. This would spawn tasks on the same pool that a task is currently running in. However this potentially leads to potential footgun situations where long running blocking tasks run on `ComputeTaskPool`.
2022-06-09 02:43:24 +00:00
|
|
|
self.threads.len()
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
|
2021-12-29 17:38:13 +00:00
|
|
|
/// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
|
2020-08-29 19:35:41 +00:00
|
|
|
/// passing a scope object into it. The scope object provided to the callback can be used
|
|
|
|
/// to spawn tasks. This function will await the completion of all tasks before returning.
|
|
|
|
///
|
|
|
|
/// This is similar to `rayon::scope` and `crossbeam::scope`
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
///
|
|
|
|
/// # Example
|
|
|
|
///
|
|
|
|
/// ```
|
|
|
|
/// use bevy_tasks::TaskPool;
|
|
|
|
///
|
|
|
|
/// let pool = TaskPool::new();
|
|
|
|
/// let mut x = 0;
|
|
|
|
/// let results = pool.scope(|s| {
|
|
|
|
/// s.spawn(async {
|
|
|
|
/// // you can borrow the spawner inside a task and spawn tasks from within the task
|
|
|
|
/// s.spawn(async {
|
|
|
|
/// // borrow x and mutate it.
|
|
|
|
/// x = 2;
|
|
|
|
/// // return a value from the task
|
|
|
|
/// 1
|
|
|
|
/// });
|
|
|
|
/// // return some other value from the first task
|
|
|
|
/// 0
|
|
|
|
/// });
|
|
|
|
/// });
|
|
|
|
///
|
2022-10-20 20:23:57 +00:00
|
|
|
/// // The ordering of results is non-deterministic if you spawn from within tasks as above.
|
|
|
|
/// // If you're doing this, you'll have to write your code to not depend on the ordering.
|
|
|
|
/// assert!(results.contains(&0));
|
|
|
|
/// assert!(results.contains(&1));
|
|
|
|
///
|
|
|
|
/// // The ordering is deterministic if you only spawn directly from the closure function.
|
|
|
|
/// let results = pool.scope(|s| {
|
|
|
|
/// s.spawn(async { 0 });
|
|
|
|
/// s.spawn(async { 1 });
|
|
|
|
/// });
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
/// assert_eq!(&results[..], &[0, 1]);
|
2022-10-20 20:23:57 +00:00
|
|
|
///
|
|
|
|
/// // You can access x after scope runs, since it was only temporarily borrowed in the scope.
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
/// assert_eq!(x, 2);
|
|
|
|
/// ```
|
|
|
|
///
|
|
|
|
/// # Lifetimes
|
|
|
|
///
|
|
|
|
/// The [`Scope`] object takes two lifetimes: `'scope` and `'env`.
|
|
|
|
///
|
|
|
|
/// The `'scope` lifetime represents the lifetime of the scope. That is the time during
|
|
|
|
/// which the provided closure and tasks that are spawned into the scope are run.
|
|
|
|
///
|
|
|
|
/// The `'env` lifetime represents the lifetime of whatever is borrowed by the scope.
|
|
|
|
/// Thus this lifetime must outlive `'scope`.
|
|
|
|
///
|
|
|
|
/// ```compile_fail
|
|
|
|
/// use bevy_tasks::TaskPool;
|
|
|
|
/// fn scope_escapes_closure() {
|
|
|
|
/// let pool = TaskPool::new();
|
|
|
|
/// let foo = Box::new(42);
|
|
|
|
/// pool.scope(|scope| {
|
|
|
|
/// std::thread::spawn(move || {
|
|
|
|
/// // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped.
|
|
|
|
/// scope.spawn(async move {
|
|
|
|
/// assert_eq!(*foo, 42);
|
|
|
|
/// });
|
|
|
|
/// });
|
|
|
|
/// });
|
|
|
|
/// }
|
|
|
|
/// ```
|
|
|
|
///
|
|
|
|
/// ```compile_fail
|
|
|
|
/// use bevy_tasks::TaskPool;
|
|
|
|
/// fn cannot_borrow_from_closure() {
|
|
|
|
/// let pool = TaskPool::new();
|
|
|
|
/// pool.scope(|scope| {
|
|
|
|
/// let x = 1;
|
|
|
|
/// let y = &x;
|
|
|
|
/// scope.spawn(async move {
|
|
|
|
/// assert_eq!(*y, 1);
|
|
|
|
/// });
|
|
|
|
/// });
|
|
|
|
/// }
|
|
|
|
///
|
|
|
|
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
|
2020-08-29 19:35:41 +00:00
|
|
|
where
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>),
|
2020-08-29 19:35:41 +00:00
|
|
|
T: Send + 'static,
|
|
|
|
{
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
// SAFETY: This safety comment applies to all references transmuted to 'env.
|
|
|
|
// Any futures spawned with these references need to return before this function completes.
|
|
|
|
// This is guaranteed because we drive all the futures spawned onto the Scope
|
|
|
|
// to completion in this function. However, rust has no way of knowing this so we
|
|
|
|
// transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
|
|
|
|
let executor: &async_executor::Executor = &*self.executor;
|
|
|
|
let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) };
|
|
|
|
let task_scope_executor = &async_executor::Executor::default();
|
|
|
|
let task_scope_executor: &'env async_executor::Executor =
|
|
|
|
unsafe { mem::transmute(task_scope_executor) };
|
|
|
|
let spawned: ConcurrentQueue<async_executor::Task<T>> = ConcurrentQueue::unbounded();
|
|
|
|
let spawned_ref: &'env ConcurrentQueue<async_executor::Task<T>> =
|
|
|
|
unsafe { mem::transmute(&spawned) };
|
|
|
|
|
|
|
|
let scope = Scope {
|
|
|
|
executor,
|
|
|
|
task_scope_executor,
|
|
|
|
spawned: spawned_ref,
|
|
|
|
scope: PhantomData,
|
|
|
|
env: PhantomData,
|
|
|
|
};
|
|
|
|
|
|
|
|
let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) };
|
|
|
|
|
|
|
|
f(scope_ref);
|
|
|
|
|
|
|
|
if spawned.is_empty() {
|
|
|
|
Vec::new()
|
|
|
|
} else {
|
|
|
|
let get_results = async move {
|
|
|
|
let mut results = Vec::with_capacity(spawned.len());
|
|
|
|
while let Ok(task) = spawned.pop() {
|
|
|
|
results.push(task.await);
|
|
|
|
}
|
2020-08-29 19:35:41 +00:00
|
|
|
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
results
|
|
|
|
};
|
2020-11-27 20:14:44 +00:00
|
|
|
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
// Pin the futures on the stack.
|
|
|
|
pin!(get_results);
|
|
|
|
|
|
|
|
// SAFETY: This function blocks until all futures complete, so we do not read/write
|
|
|
|
// the data from futures outside of the 'scope lifetime. However,
|
|
|
|
// rust has no way of knowing this so we must convert to 'static
|
|
|
|
// here to appease the compiler as it is unable to validate safety.
|
|
|
|
let get_results: Pin<&mut (dyn Future<Output = Vec<T>> + 'static + Send)> = get_results;
|
|
|
|
let get_results: Pin<&'static mut (dyn Future<Output = Vec<T>> + 'static + Send)> =
|
|
|
|
unsafe { mem::transmute(get_results) };
|
|
|
|
|
|
|
|
// The thread that calls scope() will participate in driving tasks in the pool
|
|
|
|
// forward until the tasks that are spawned by this scope() call
|
|
|
|
// complete. (If the caller of scope() happens to be a thread in
|
|
|
|
// this thread pool, and we only have one thread in the pool, then
|
|
|
|
// simply calling future::block_on(spawned) would deadlock.)
|
|
|
|
let mut spawned = task_scope_executor.spawn(get_results);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
if let Some(result) = future::block_on(future::poll_once(&mut spawned)) {
|
|
|
|
break result;
|
2021-01-18 21:48:28 +00:00
|
|
|
};
|
2020-08-29 19:35:41 +00:00
|
|
|
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
self.executor.try_tick();
|
|
|
|
task_scope_executor.try_tick();
|
2020-11-26 02:05:55 +00:00
|
|
|
}
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
}
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
|
|
|
|
/// cancelled and "detached" allowing it to continue running without having to be polled by the
|
|
|
|
/// end-user.
|
2022-01-16 04:53:22 +00:00
|
|
|
///
|
|
|
|
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
|
2020-09-22 03:23:09 +00:00
|
|
|
pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'static) -> Task<T>
|
2020-08-29 19:35:41 +00:00
|
|
|
where
|
|
|
|
T: Send + 'static,
|
|
|
|
{
|
2020-09-22 03:23:09 +00:00
|
|
|
Task::new(self.executor.spawn(future))
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
2021-01-18 21:48:28 +00:00
|
|
|
|
2022-01-16 04:53:22 +00:00
|
|
|
/// Spawns a static future on the thread-local async executor for the current thread. The task
|
|
|
|
/// will run entirely on the thread the task was spawned on. The returned Task is a future.
|
|
|
|
/// It can also be cancelled and "detached" allowing it to continue running without having
|
|
|
|
/// to be polled by the end-user. Users should generally prefer to use [`TaskPool::spawn`]
|
|
|
|
/// instead, unless the provided future is not `Send`.
|
2021-01-18 21:48:28 +00:00
|
|
|
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
|
|
|
|
where
|
|
|
|
T: 'static,
|
|
|
|
{
|
|
|
|
Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
|
|
|
|
}
|
2022-10-24 13:46:40 +00:00
|
|
|
|
|
|
|
/// Runs a function with the local executor. Typically used to tick
|
|
|
|
/// the local executor on the main thread as it needs to share time with
|
|
|
|
/// other things.
|
|
|
|
///
|
|
|
|
/// ```rust
|
|
|
|
/// use bevy_tasks::TaskPool;
|
|
|
|
///
|
|
|
|
/// TaskPool::new().with_local_executor(|local_executor| {
|
|
|
|
/// local_executor.try_tick();
|
|
|
|
/// });
|
|
|
|
/// ```
|
|
|
|
pub fn with_local_executor<F, R>(&self, f: F) -> R
|
|
|
|
where
|
|
|
|
F: FnOnce(&async_executor::LocalExecutor) -> R,
|
|
|
|
{
|
|
|
|
Self::LOCAL_EXECUTOR.with(f)
|
|
|
|
}
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for TaskPool {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::new()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Add global init and get accessors for all newtyped TaskPools (#2250)
Right now, a direct reference to the target TaskPool is required to launch tasks on the pools, despite the three newtyped pools (AsyncComputeTaskPool, ComputeTaskPool, and IoTaskPool) effectively acting as global instances. The need to pass a TaskPool reference adds notable friction to spawning subtasks within existing tasks. Possible use cases for this may include chaining tasks within the same pool like spawning separate send/receive I/O tasks after waiting on a network connection to be established, or allowing cross-pool dependent tasks like starting dependent multi-frame computations following a long I/O load.
Other task execution runtimes provide static access to spawning tasks (i.e. `tokio::spawn`), which is notably easier to use than the reference passing required by `bevy_tasks` right now.
This PR makes does the following:
* Adds `*TaskPool::init` which initializes a `OnceCell`'ed with a provided TaskPool. Failing if the pool has already been initialized.
* Adds `*TaskPool::get` which fetches the initialized global pool of the respective type or panics. This generally should not be an issue in normal Bevy use, as the pools are initialized before they are accessed.
* Updated default task pool initialization to either pull the global handles and save them as resources, or if they are already initialized, pull the a cloned global handle as the resource.
This should make it notably easier to build more complex task hierarchies for dependent tasks. It should also make writing bevy-adjacent, but not strictly bevy-only plugin crates easier, as the global pools ensure it's all running on the same threads.
One alternative considered is keeping a thread-local reference to the pool for all threads in each pool to enable the same `tokio::spawn` interface. This would spawn tasks on the same pool that a task is currently running in. However this potentially leads to potential footgun situations where long running blocking tasks run on `ComputeTaskPool`.
2022-06-09 02:43:24 +00:00
|
|
|
impl Drop for TaskPool {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.shutdown_tx.close();
|
|
|
|
|
|
|
|
let panicking = thread::panicking();
|
|
|
|
for join_handle in self.threads.drain(..) {
|
|
|
|
let res = join_handle.join();
|
|
|
|
if !panicking {
|
|
|
|
res.expect("Task thread panicked while executing.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-16 04:53:22 +00:00
|
|
|
/// A `TaskPool` scope for running one or more non-`'static` futures.
|
|
|
|
///
|
|
|
|
/// For more information, see [`TaskPool::scope`].
|
2020-10-08 18:43:01 +00:00
|
|
|
#[derive(Debug)]
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
pub struct Scope<'scope, 'env: 'scope, T> {
|
2020-09-20 18:27:24 +00:00
|
|
|
executor: &'scope async_executor::Executor<'scope>,
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
task_scope_executor: &'scope async_executor::Executor<'scope>,
|
|
|
|
spawned: &'scope ConcurrentQueue<async_executor::Task<T>>,
|
|
|
|
// make `Scope` invariant over 'scope and 'env
|
|
|
|
scope: PhantomData<&'scope mut &'scope ()>,
|
|
|
|
env: PhantomData<&'env mut &'env ()>,
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> {
|
2022-01-16 04:53:22 +00:00
|
|
|
/// Spawns a scoped future onto the thread pool. The scope *must* outlive
|
|
|
|
/// the provided future. The results of the future will be returned as a part of
|
|
|
|
/// [`TaskPool::scope`]'s return value.
|
|
|
|
///
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
/// For futures that should run on the thread `scope` is called on [`Scope::spawn_on_scope`] should be used
|
2022-01-16 04:53:22 +00:00
|
|
|
/// instead.
|
|
|
|
///
|
|
|
|
/// For more information, see [`TaskPool::scope`].
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&self, f: Fut) {
|
2020-09-20 18:27:24 +00:00
|
|
|
let task = self.executor.spawn(f);
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
// ConcurrentQueue only errors when closed or full, but we never
|
|
|
|
// close and use an unbouded queue, so it is safe to unwrap
|
|
|
|
self.spawned.push(task).unwrap();
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
2021-01-18 21:48:28 +00:00
|
|
|
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
/// Spawns a scoped future onto the thread the scope is run on. The scope *must* outlive
|
2022-01-16 04:53:22 +00:00
|
|
|
/// the provided future. The results of the future will be returned as a part of
|
|
|
|
/// [`TaskPool::scope`]'s return value. Users should generally prefer to use
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
/// [`Scope::spawn`] instead, unless the provided future needs to run on the scope's thread.
|
2022-01-16 04:53:22 +00:00
|
|
|
///
|
|
|
|
/// For more information, see [`TaskPool::scope`].
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + Send>(&self, f: Fut) {
|
|
|
|
let task = self.task_scope_executor.spawn(f);
|
|
|
|
// ConcurrentQueue only errors when closed or full, but we never
|
|
|
|
// close and use an unbouded queue, so it is safe to unwrap
|
|
|
|
self.spawned.push(task).unwrap();
|
2021-01-18 21:48:28 +00:00
|
|
|
}
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
2022-09-18 23:52:01 +00:00
|
|
|
#[allow(clippy::disallowed_types)]
|
2020-08-29 19:35:41 +00:00
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2021-01-18 21:48:28 +00:00
|
|
|
use std::sync::{
|
|
|
|
atomic::{AtomicBool, AtomicI32, Ordering},
|
|
|
|
Barrier,
|
|
|
|
};
|
2020-08-29 19:35:41 +00:00
|
|
|
|
|
|
|
#[test]
|
2021-05-01 20:07:06 +00:00
|
|
|
fn test_spawn() {
|
2020-08-29 19:35:41 +00:00
|
|
|
let pool = TaskPool::new();
|
|
|
|
|
|
|
|
let foo = Box::new(42);
|
|
|
|
let foo = &*foo;
|
|
|
|
|
2020-09-06 04:46:23 +00:00
|
|
|
let count = Arc::new(AtomicI32::new(0));
|
|
|
|
|
2020-08-29 19:35:41 +00:00
|
|
|
let outputs = pool.scope(|scope| {
|
2020-09-06 04:46:23 +00:00
|
|
|
for _ in 0..100 {
|
|
|
|
let count_clone = count.clone();
|
2020-08-29 19:35:41 +00:00
|
|
|
scope.spawn(async move {
|
|
|
|
if *foo != 42 {
|
|
|
|
panic!("not 42!?!?")
|
|
|
|
} else {
|
2020-09-06 04:46:23 +00:00
|
|
|
count_clone.fetch_add(1, Ordering::Relaxed);
|
2020-08-29 19:35:41 +00:00
|
|
|
*foo
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2020-09-06 04:46:23 +00:00
|
|
|
for output in &outputs {
|
|
|
|
assert_eq!(*output, 42);
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
2020-09-06 04:46:23 +00:00
|
|
|
|
|
|
|
assert_eq!(outputs.len(), 100);
|
|
|
|
assert_eq!(count.load(Ordering::Relaxed), 100);
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|
2021-01-18 21:48:28 +00:00
|
|
|
|
|
|
|
#[test]
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
fn test_mixed_spawn_on_scope_and_spawn() {
|
2021-01-18 21:48:28 +00:00
|
|
|
let pool = TaskPool::new();
|
|
|
|
|
|
|
|
let foo = Box::new(42);
|
|
|
|
let foo = &*foo;
|
|
|
|
|
|
|
|
let local_count = Arc::new(AtomicI32::new(0));
|
|
|
|
let non_local_count = Arc::new(AtomicI32::new(0));
|
|
|
|
|
|
|
|
let outputs = pool.scope(|scope| {
|
|
|
|
for i in 0..100 {
|
|
|
|
if i % 2 == 0 {
|
|
|
|
let count_clone = non_local_count.clone();
|
|
|
|
scope.spawn(async move {
|
|
|
|
if *foo != 42 {
|
|
|
|
panic!("not 42!?!?")
|
|
|
|
} else {
|
|
|
|
count_clone.fetch_add(1, Ordering::Relaxed);
|
|
|
|
*foo
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
let count_clone = local_count.clone();
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
scope.spawn_on_scope(async move {
|
2021-01-18 21:48:28 +00:00
|
|
|
if *foo != 42 {
|
|
|
|
panic!("not 42!?!?")
|
|
|
|
} else {
|
|
|
|
count_clone.fetch_add(1, Ordering::Relaxed);
|
|
|
|
*foo
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
for output in &outputs {
|
|
|
|
assert_eq!(*output, 42);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert_eq!(outputs.len(), 100);
|
|
|
|
assert_eq!(local_count.load(Ordering::Relaxed), 50);
|
|
|
|
assert_eq!(non_local_count.load(Ordering::Relaxed), 50);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2021-05-01 20:07:06 +00:00
|
|
|
fn test_thread_locality() {
|
2021-01-18 21:48:28 +00:00
|
|
|
let pool = Arc::new(TaskPool::new());
|
|
|
|
let count = Arc::new(AtomicI32::new(0));
|
|
|
|
let barrier = Arc::new(Barrier::new(101));
|
|
|
|
let thread_check_failed = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
for _ in 0..100 {
|
|
|
|
let inner_barrier = barrier.clone();
|
|
|
|
let count_clone = count.clone();
|
|
|
|
let inner_pool = pool.clone();
|
|
|
|
let inner_thread_check_failed = thread_check_failed.clone();
|
|
|
|
std::thread::spawn(move || {
|
|
|
|
inner_pool.scope(|scope| {
|
|
|
|
let inner_count_clone = count_clone.clone();
|
|
|
|
scope.spawn(async move {
|
|
|
|
inner_count_clone.fetch_add(1, Ordering::Release);
|
|
|
|
});
|
|
|
|
let spawner = std::thread::current().id();
|
|
|
|
let inner_count_clone = count_clone.clone();
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
scope.spawn_on_scope(async move {
|
2021-01-18 21:48:28 +00:00
|
|
|
inner_count_clone.fetch_add(1, Ordering::Release);
|
|
|
|
if std::thread::current().id() != spawner {
|
2021-03-11 00:27:30 +00:00
|
|
|
// NOTE: This check is using an atomic rather than simply panicing the
|
|
|
|
// thread to avoid deadlocking the barrier on failure
|
2021-01-18 21:48:28 +00:00
|
|
|
inner_thread_check_failed.store(true, Ordering::Release);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
inner_barrier.wait();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
barrier.wait();
|
|
|
|
assert!(!thread_check_failed.load(Ordering::Acquire));
|
|
|
|
assert_eq!(count.load(Ordering::Acquire), 200);
|
|
|
|
}
|
Nested spawns on scope (#4466)
# Objective
- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301
## Solution
- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---
## Changelog
Add ability to nest spawns
```rust
fn main() {
let pool = TaskPool::new();
pool.scope(|scope| {
scope.spawn(async move {
// calling scope.spawn from an spawn task was not possible before
scope.spawn(async move {
// do something
});
});
})
}
```
## Migration Guide
If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.
```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```
`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.
## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests
Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
2022-09-28 01:59:10 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_nested_spawn() {
|
|
|
|
let pool = TaskPool::new();
|
|
|
|
|
|
|
|
let foo = Box::new(42);
|
|
|
|
let foo = &*foo;
|
|
|
|
|
|
|
|
let count = Arc::new(AtomicI32::new(0));
|
|
|
|
|
|
|
|
let outputs: Vec<i32> = pool.scope(|scope| {
|
|
|
|
for _ in 0..10 {
|
|
|
|
let count_clone = count.clone();
|
|
|
|
scope.spawn(async move {
|
|
|
|
for _ in 0..10 {
|
|
|
|
let count_clone_clone = count_clone.clone();
|
|
|
|
scope.spawn(async move {
|
|
|
|
if *foo != 42 {
|
|
|
|
panic!("not 42!?!?")
|
|
|
|
} else {
|
|
|
|
count_clone_clone.fetch_add(1, Ordering::Relaxed);
|
|
|
|
*foo
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
*foo
|
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
for output in &outputs {
|
|
|
|
assert_eq!(*output, 42);
|
|
|
|
}
|
|
|
|
|
|
|
|
// the inner loop runs 100 times and the outer one runs 10. 100 + 10
|
|
|
|
assert_eq!(outputs.len(), 110);
|
|
|
|
assert_eq!(count.load(Ordering::Relaxed), 100);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_nested_locality() {
|
|
|
|
let pool = Arc::new(TaskPool::new());
|
|
|
|
let count = Arc::new(AtomicI32::new(0));
|
|
|
|
let barrier = Arc::new(Barrier::new(101));
|
|
|
|
let thread_check_failed = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
for _ in 0..100 {
|
|
|
|
let inner_barrier = barrier.clone();
|
|
|
|
let count_clone = count.clone();
|
|
|
|
let inner_pool = pool.clone();
|
|
|
|
let inner_thread_check_failed = thread_check_failed.clone();
|
|
|
|
std::thread::spawn(move || {
|
|
|
|
inner_pool.scope(|scope| {
|
|
|
|
let spawner = std::thread::current().id();
|
|
|
|
let inner_count_clone = count_clone.clone();
|
|
|
|
scope.spawn(async move {
|
|
|
|
inner_count_clone.fetch_add(1, Ordering::Release);
|
|
|
|
|
|
|
|
// spawning on the scope from another thread runs the futures on the scope's thread
|
|
|
|
scope.spawn_on_scope(async move {
|
|
|
|
inner_count_clone.fetch_add(1, Ordering::Release);
|
|
|
|
if std::thread::current().id() != spawner {
|
|
|
|
// NOTE: This check is using an atomic rather than simply panicing the
|
|
|
|
// thread to avoid deadlocking the barrier on failure
|
|
|
|
inner_thread_check_failed.store(true, Ordering::Release);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
inner_barrier.wait();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
barrier.wait();
|
|
|
|
assert!(!thread_check_failed.load(Ordering::Acquire));
|
|
|
|
assert_eq!(count.load(Ordering::Acquire), 200);
|
|
|
|
}
|
2020-08-29 19:35:41 +00:00
|
|
|
}
|