feat: create generic any_spawner crate to share between reactive system and renderer

This commit is contained in:
Greg Johnston 2024-02-16 20:37:59 -05:00
parent 6a2eafcbc6
commit 8252c4a977
19 changed files with 137 additions and 158 deletions

View file

@ -6,6 +6,7 @@ members = [
"const_str_slice_concat",
"next_tuple",
"or_poisoned",
"any_spawner",
# core
"hydration_context",
@ -38,6 +39,7 @@ version = "0.6.13"
rust-version = "1.75"
[workspace.dependencies]
any_spawner = { path = "./any_spawner/" }
const_str_slice_concat = { path = "./const_str_slice_concat" }
hydration_context = { path = "./hydration_context" }
leptos = { path = "./leptos", version = "0.6.5" }

23
any_spawner/Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "any_spawner"
edition = "2021"
version.workspace = true
[dependencies]
futures = "0.3"
glib = { version = "0.19", optional = true }
thiserror = "1"
tokio = { version = "1", optional = true, default-features = false, features = ["rt"] }
tracing = { version = "0.1", optional = true }
wasm-bindgen-futures = { version = "0.4", optional = true }
[features]
tracing = ["dep:tracing"]
tokio = ["dep:tokio"]
glib = ["dep:glib"]
wasm-bindgen = ["dep:wasm-bindgen-futures"]
futures-executor = ["futures/thread-pool", "futures/executor"]
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

View file

@ -1,25 +1,65 @@
//! Sets and uses a global `async` executor, which is used for scheduling effects.
//! This crate makes it easier to write asynchronous code that is executor-agnostic, by providing a
//! utility that can be used to spawn tasks in a variety of executors.
//!
//! The executor must be set exactly once in a program.
//! It only supports single executor per program, but that executor can be set at runtime, anywhere
//! in your crate (or an application that depends on it).
//!
//! This can be extended to support any executor or runtime that supports spawning [`Future`]s.
//!
//! This is a least common denominator implementation in many ways. Limitations include:
//! - setting an executor is a one-time, global action
//! - no "join handle" or other result is returned from the spawn
//! - the `Future` must output `()`
//!
//! ```rust
//! use any_spawner::Executor;
//!
//! Executor::init_futures_executor()
//! .expect("executor should only be initialized once");
//!
//! // spawn a thread-safe Future
//! Executor::spawn(async { /* ... */ });
//!
//! // spawn a Future that is !Send
//! Executor::spawn_local(async { /* ... */ });
//! ```
use crate::{PinnedFuture, PinnedLocalFuture};
use std::{future::Future, sync::OnceLock};
#![forbid(unsafe_code)]
#![deny(missing_docs)]
use std::{future::Future, pin::Pin, sync::OnceLock};
use thiserror::Error;
pub(crate) type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub(crate) type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
static SPAWN: OnceLock<fn(PinnedFuture<()>)> = OnceLock::new();
static SPAWN_LOCAL: OnceLock<fn(PinnedLocalFuture<()>)> = OnceLock::new();
/// Errors that can occur when using the executor.
#[derive(Error, Debug)]
pub enum ExecutorError {
/// The executor has already been set.
#[error("Executor has already been set.")]
AlreadySet,
}
/// A global async executor that can spawn tasks.
pub struct Executor;
impl Executor {
/// Spawns a thread-safe [`Future`].
/// ```rust
/// use any_spawner::Executor;
///
/// Executor::init_futures_executor()
/// .expect("executor should only be initialized once");
///
/// // spawn a thread-safe Future
/// Executor::spawn(async { /* ... */ });
/// ```
#[track_caller]
pub fn spawn(fut: impl Future<Output = ()> + Send + Sync + 'static) {
pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
if let Some(spawner) = SPAWN.get() {
spawner(Box::pin(fut))
} else {
@ -38,6 +78,16 @@ impl Executor {
}
}
/// Spawns a [`Future`] that cannot be sent across threads.
/// ```rust
/// use any_spawner::Executor;
///
/// Executor::init_futures_executor()
/// .expect("executor should only be initialized once");
///
/// // spawn a thread-safe Future
/// Executor::spawn(async { /* ... */ });
/// ```
#[track_caller]
pub fn spawn_local(fut: impl Future<Output = ()> + 'static) {
if let Some(spawner) = SPAWN_LOCAL.get() {
@ -60,6 +110,11 @@ impl Executor {
}
impl Executor {
/// Globally sets the [`tokio`] runtime as the executor used to spawn tasks.
///
/// Returns `Err(_)` if an executor has already been set.
///
/// Requires the `tokio` feature to be activated on this crate.
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub fn init_tokio() -> Result<(), ExecutorError> {
@ -76,6 +131,11 @@ impl Executor {
Ok(())
}
/// Globally sets the [`wasm-bindgen-futures`] runtime as the executor used to spawn tasks.
///
/// Returns `Err(_)` if an executor has already been set.
///
/// Requires the `wasm-bindgen` feature to be activated on this crate.
#[cfg(feature = "wasm-bindgen")]
#[cfg_attr(docsrs, doc(cfg(feature = "wasm-bindgen")))]
pub fn init_wasm_bindgen() -> Result<(), ExecutorError> {
@ -92,6 +152,11 @@ impl Executor {
Ok(())
}
/// Globally sets the [`glib`] runtime as the executor used to spawn tasks.
///
/// Returns `Err(_)` if an executor has already been set.
///
/// Requires the `glib` feature to be activated on this crate.
#[cfg(feature = "glib")]
#[cfg_attr(docsrs, doc(cfg(feature = "glib")))]
pub fn init_glib() -> Result<(), ExecutorError> {
@ -110,6 +175,12 @@ impl Executor {
Ok(())
}
/// Globally sets the [`futures`] executor as the executor used to spawn tasks,
/// lazily creating a thread pool to spawn tasks into.
///
/// Returns `Err(_)` if an executor has already been set.
///
/// Requires the `futures-executor` feature to be activated on this crate.
#[cfg(feature = "futures-executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures-executor")))]
pub fn init_futures_executor() -> Result<(), ExecutorError> {
@ -148,3 +219,20 @@ impl Executor {
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::Executor;
use std::rc::Rc;
#[cfg(feature = "futures-executor")]
#[test]
fn can_spawn_local_future() {
Executor::init_futures_executor().expect("couldn't set executor");
let rc = Rc::new(());
Executor::spawn_local(async {
_ = rc;
});
Executor::spawn(async {});
}
}

View file

@ -10,6 +10,7 @@ readme = "../README.md"
rust-version.workspace = true
[dependencies]
any_spawner = { workspace = true, features = ["wasm-bindgen"] }
cfg-if = "1"
leptos_dom = { workspace = true }
leptos_macro = { workspace = true }
@ -18,7 +19,7 @@ leptos_server = { workspace = true }
leptos_config = { workspace = true }
leptos-spin-macro = { version = "0.2", optional = true }
tracing = "0.1"
reactive_graph = { workspace = true, features = ["wasm-bindgen"] }
reactive_graph = { workspace = true }
tachys = { workspace = true, features = ["reactive_graph"] }
typed-builder = "0.18"
typed-builder-macro = "0.18"

View file

@ -4,16 +4,14 @@ edition = "2021"
version.workspace = true
[dependencies]
any_spawner = { workspace = true }
or_poisoned = { workspace = true }
futures = "0.3"
glib = { version = "0.19", optional = true }
pin-project-lite = "0.2"
rustc-hash = "1.1.0"
slotmap = "1"
thiserror = "1"
tokio = { version = "1", optional = true, default-features = false, features = ["rt"] }
tracing = { version = "0.1", optional = true }
wasm-bindgen-futures = { version = "0.4", optional = true }
guardian = "1"
[dev-dependencies]
@ -22,10 +20,6 @@ tokio = { version = "1", features = ["rt-multi-thread", "time", "macros"] }
[features]
nightly = []
tracing = ["dep:tracing"]
tokio = ["dep:tokio"]
glib = ["dep:glib"]
wasm-bindgen = ["dep:wasm-bindgen-futures"]
futures-executor = ["futures/thread-pool", "futures/executor"]
[package.metadata.docs.rs]
all-features = true

View file

@ -4,7 +4,6 @@ use super::{
};
use crate::{
channel::channel,
executor::Executor,
graph::{
AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
SubscriberSet, ToAnySource, ToAnySubscriber,
@ -13,6 +12,7 @@ use crate::{
signal::SignalReadGuard,
traits::{DefinedAt, ReadUntracked},
};
use any_spawner::Executor;
use core::fmt::Debug;
use futures::{FutureExt, StreamExt};
use or_poisoned::OrPoisoned;

View file

@ -1,10 +1,10 @@
use crate::{
channel::{channel, Receiver},
effect::inner::EffectInner,
executor::Executor,
graph::{AnySubscriber, SourceSet, Subscriber, ToAnySubscriber},
owner::Owner,
};
use any_spawner::Executor;
use futures::StreamExt;
use or_poisoned::OrPoisoned;
use std::{

View file

@ -1,10 +1,10 @@
use crate::{
channel::channel,
effect::inner::EffectInner,
executor::Executor,
graph::{AnySubscriber, SourceSet, Subscriber, ToAnySubscriber},
owner::Owner,
};
use any_spawner::Executor;
use futures::StreamExt;
use or_poisoned::OrPoisoned;
use std::{

View file

@ -76,7 +76,6 @@ pub(crate) mod channel;
pub mod computed;
pub mod diagnostics;
pub mod effect;
pub mod executor;
pub mod graph;
pub mod owner;
pub mod selector;

View file

@ -1,6 +1,6 @@
use any_spawner::Executor;
use reactive_graph::{
computed::{ArcAsyncDerived, AsyncDerived, AsyncState},
executor::Executor,
signal::RwSignal,
traits::{Get, Read, Set, With, WithUntracked},
};

View file

@ -1,6 +1,6 @@
use any_spawner::Executor;
use reactive_graph::{
effect::{Effect, RenderEffect},
executor::Executor,
prelude::*,
signal::RwSignal,
};

View file

@ -1,13 +0,0 @@
use reactive_graph::executor::Executor;
use std::rc::Rc;
#[cfg(feature = "futures-executor")]
#[test]
fn futures_executor_test() {
Executor::init_futures_executor().expect("couldn't set executor");
let rc = Rc::new(());
Executor::spawn_local(async {
_ = rc;
});
Executor::spawn(async {});
}

View file

@ -1,7 +1,7 @@
use any_spawner::Executor;
use reactive_graph::{
computed::{ArcMemo, Memo},
effect::Effect,
executor::Executor,
prelude::*,
signal::RwSignal,
};

View file

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
any_spawner = { workspace = true }
const_str_slice_concat = { workspace = true }
next_tuple = { path = "../next_tuple" }
reactive_graph = { workspace = true, optional = true }
@ -135,11 +136,8 @@ web-sys = { version = "0.3", features = [
drain_filter_polyfill = "0.1"
indexmap = "2"
rustc-hash = "1"
tokio = { version = "1", optional = true, features = ["rt"] }
wasm-bindgen-futures = { version = "0.4", optional = true }
futures = "0.3"
parking_lot = "0.12"
pin-project-lite = "0.2"
itertools = "0.12"
send_wrapper = "0.6"
@ -156,5 +154,3 @@ ssr = []
nightly = ["reactive_graph/nightly"]
testing = ["dep:slotmap"]
reactive_graph = ["dep:reactive_graph"]
tokio = ["dep:tokio"]
web = ["dep:wasm-bindgen-futures"]

View file

@ -1,13 +1,13 @@
use crate::{
hydration::Cursor,
renderer::{Renderer, SpawningRenderer},
spawner::Spawner,
renderer::Renderer,
ssr::StreamBuilder,
view::{
either::{Either, EitherState},
Mountable, Position, PositionState, Render, RenderHtml,
},
};
use any_spawner::Executor;
use futures::FutureExt;
use parking_lot::RwLock;
use std::{fmt::Debug, future::Future, sync::Arc};
@ -61,7 +61,7 @@ where
Fal: Render<Rndr> + 'static,
Fut: Future + 'static,
Fut::Output: Render<Rndr>,
Rndr: SpawningRenderer + 'static,
Rndr: Renderer + 'static,
{
type State = Arc<RwLock<EitherState<Fal, Fut::Output, Rndr>>>;
@ -86,7 +86,7 @@ where
// spawning immediately means that our now_or_never poll result isn't lost
// if it wasn't pending at first, we don't need to poll the Future again
if initially_pending {
Rndr::Spawn::spawn_local({
Executor::spawn_local({
let state = Arc::clone(&state);
async move {
let value = fut.as_mut().await;
@ -105,7 +105,7 @@ where
}
// spawn the future, and rebuild the state when it resolves
Rndr::Spawn::spawn_local({
Executor::spawn_local({
let state = Arc::clone(state);
async move {
let value = self.fut.await;
@ -121,7 +121,7 @@ where
Fal: RenderHtml<Rndr> + Send + Sync + 'static,
Fut: Future + Send + Sync + 'static,
Fut::Output: RenderHtml<Rndr>,
Rndr: SpawningRenderer + 'static,
Rndr: Renderer + 'static,
Rndr::Node: Clone,
Rndr::Element: Clone,
{
@ -207,7 +207,7 @@ where
// spawning immediately means that our now_or_never poll result isn't lost
// if it wasn't pending at first, we don't need to poll the Future again
if initially_pending {
Rndr::Spawn::spawn_local({
Executor::spawn_local({
let state = Arc::clone(&state);
async move {
let value = fut.as_mut().await;

View file

@ -16,7 +16,7 @@ pub mod prelude {
element::{ElementChild, InnerHtmlAttribute},
node_ref::NodeRefAttribute,
},
renderer::{dom::Dom, Renderer, SpawningRenderer},
renderer::{dom::Dom, Renderer},
view::{
error_boundary::TryCatchBoundary, Mountable, Render, RenderHtml,
},
@ -33,7 +33,6 @@ pub mod html;
pub mod hydration;
pub mod mathml;
pub mod renderer;
pub mod spawner;
pub mod ssr;
pub mod svg;
pub mod view;

View file

@ -1,5 +1,3 @@
#[cfg(any(feature = "tokio", feature = "web"))]
use super::SpawningRenderer;
use super::{CastFrom, DomRenderer, Renderer};
use crate::{
dom::{document, window},
@ -417,13 +415,3 @@ where
source.dyn_into::<T>().ok()
}
}
#[cfg(feature = "web")]
impl SpawningRenderer for Dom {
type Spawn = crate::spawner::wasm::Wasm;
}
#[cfg(all(feature = "tokio", not(feature = "web")))]
impl SpawningRenderer for Dom {
type Spawn = crate::spawner::tokio::Tokio;
}

View file

@ -1,4 +1,4 @@
use crate::{html::element::CreateElement, spawner::Spawner, view::Mountable};
use crate::{html::element::CreateElement, view::Mountable};
use std::borrow::Cow;
use wasm_bindgen::JsValue;
@ -150,11 +150,6 @@ pub trait DomRenderer: Renderer {
fn set_inner_html(el: &Self::Element, html: &str);
}
/// A renderer that is able to spawn async tasks during rendering.
pub trait SpawningRenderer: Renderer {
type Spawn: Spawner;
}
/// Attempts to cast from one type to another.
///
/// This works in a similar way to `TryFrom`. We implement it as a separate trait

View file

@ -1,93 +0,0 @@
use std::future::Future;
/// Allows spawning a [`Future`] to be run in a separate task.
pub trait Spawner {
fn spawn<Fut>(fut: Fut)
where
Fut: Future + Send + Sync + 'static;
fn spawn_local<Fut>(fut: Fut)
where
Fut: Future + 'static;
}
/// A spawner that will block in place when spawning an async task.
///
/// This is mostly useful for testing, and especially for synchronous-only code.
pub struct BlockSpawn;
impl Spawner for BlockSpawn {
fn spawn<Fut>(fut: Fut)
where
Fut: Future + Send + Sync + 'static,
{
futures::executor::block_on(async move {
fut.await;
});
}
fn spawn_local<Fut>(fut: Fut)
where
Fut: Future + 'static,
{
futures::executor::block_on(async move {
fut.await;
});
}
}
#[cfg(feature = "web")]
pub mod wasm {
use super::Spawner;
use wasm_bindgen_futures::spawn_local;
#[derive(Debug, Copy, Clone)]
pub struct Wasm;
impl Spawner for Wasm {
fn spawn<Fut>(fut: Fut)
where
Fut: futures::Future + Send + Sync + 'static,
{
Self::spawn_local(fut);
}
fn spawn_local<Fut>(fut: Fut)
where
Fut: futures::Future + 'static,
{
spawn_local(async move {
fut.await;
});
}
}
}
#[cfg(feature = "tokio")]
pub mod tokio {
use super::Spawner;
use tokio::task::{spawn, spawn_local};
#[derive(Debug, Copy, Clone)]
pub struct Tokio;
impl Spawner for Tokio {
fn spawn<Fut>(fut: Fut)
where
Fut: futures::Future + Send + Sync + 'static,
{
spawn(async move {
fut.await;
});
}
fn spawn_local<Fut>(fut: Fut)
where
Fut: futures::Future + 'static,
{
spawn_local(async move {
fut.await;
});
}
}
}