diff --git a/examples/todo_app_sqlite_axum/src/todo.rs b/examples/todo_app_sqlite_axum/src/todo.rs index 784d69837..73eea7a07 100644 --- a/examples/todo_app_sqlite_axum/src/todo.rs +++ b/examples/todo_app_sqlite_axum/src/todo.rs @@ -106,7 +106,9 @@ pub fn TodoApp() -> impl IntoView { #[component] pub fn Todos() -> impl IntoView { - //let add_todo = create_server_multi_action::(); + let add_todo = create_server_multi_action::(); + let delete_todo = ServerAction::::new(); + let submissions = add_todo.submissions(); let delete_todo = ServerAction::::new(); //let submissions = add_todo.submissions(); diff --git a/reactive_graph/src/action.rs b/reactive_graph/src/actions/action.rs similarity index 83% rename from reactive_graph/src/action.rs rename to reactive_graph/src/actions/action.rs index c306e6922..14ac62d3b 100644 --- a/reactive_graph/src/action.rs +++ b/reactive_graph/src/actions/action.rs @@ -1,63 +1,13 @@ use crate::{ - computed::AsyncState, diagnostics::is_suppressing_resource_load, owner::{Owner, StoredValue}, - signal::{ArcReadSignal, ArcRwSignal, ReadSignal, RwSignal}, - traits::{DefinedAt, GetUntracked, Set, Update, WithUntracked}, + signal::{ArcRwSignal, RwSignal}, + traits::{DefinedAt, GetUntracked, Update}, unwrap_signal, }; use any_spawner::Executor; -use futures::{ - channel::oneshot, - select, - stream::{AbortRegistration, Abortable}, - FutureExt, -}; -use std::{ - future::Future, - mem::swap, - panic::Location, - pin::Pin, - sync::{atomic::AtomicUsize, Arc}, -}; - -/*enum ActionState { - Idle, - Loading(I), - LoadingMultiple(I, usize), - Complete(O), - Reloading(I, O), - ReloadingMultiple(I, O, usize), -} - -impl ActionState { - fn currently_loading(&self) -> usize { - match self { - ActionState::Idle => 0, - ActionState::Loading(_) => 1, - ActionState::LoadingMultiple(_, curr) => *curr, - ActionState::Complete(_) => 0, - ActionState::Reloading(_, _) => 1, - ActionState::ReloadingMultiple(_, _, curr) => *curr, - } - } -}*/ - -struct ActionInner { - input: Option, - value: Option, - version: usize, -} - -impl Default for ActionInner { - fn default() -> Self { - Self { - input: Default::default(), - value: Default::default(), - version: Default::default(), - } - } -} +use futures::{channel::oneshot, select, FutureExt}; +use std::{future::Future, panic::Location, pin::Pin, sync::Arc}; pub struct ArcAction where diff --git a/reactive_graph/src/actions/mod.rs b/reactive_graph/src/actions/mod.rs new file mode 100644 index 000000000..72a92b260 --- /dev/null +++ b/reactive_graph/src/actions/mod.rs @@ -0,0 +1,4 @@ +mod action; +mod multi_action; +pub use action::*; +pub use multi_action::*; diff --git a/reactive_graph/src/actions/multi_action.rs b/reactive_graph/src/actions/multi_action.rs new file mode 100644 index 000000000..c5c5c8806 --- /dev/null +++ b/reactive_graph/src/actions/multi_action.rs @@ -0,0 +1,264 @@ +use crate::{ + diagnostics::is_suppressing_resource_load, + owner::StoredValue, + signal::{ArcReadSignal, ArcRwSignal, ReadSignal, RwSignal}, + traits::{GetUntracked, Set, Update}, +}; +use any_spawner::Executor; +use std::{future::Future, pin::Pin, sync::Arc}; + +pub struct MultiAction +where + I: 'static, + O: 'static, +{ + inner: StoredValue>, +} + +impl Copy for MultiAction +where + I: 'static, + O: 'static, +{ +} + +impl Clone for MultiAction +where + I: 'static, + O: 'static, +{ + fn clone(&self) -> Self { + *self + } +} + +impl MultiAction +where + I: Send + Sync + 'static, + O: Send + Sync + 'static, +{ + #[track_caller] + pub fn new(action_fn: impl Fn(&I) -> Fut + 'static) -> Self + where + Fut: Future + Send + 'static, + ArcMultiAction: Send + Sync, + { + Self { + inner: StoredValue::new(ArcMultiAction::new(action_fn)), + } + } + + /// Calls the `async` function with a reference to the input type as its argument. + pub fn dispatch(&self, input: I) { + if !is_suppressing_resource_load() { + self.inner.with_value(|inner| inner.dispatch(input)); + } + } + + /// The set of all submissions to this multi-action. + pub fn submissions(&self) -> ReadSignal>> { + todo!() + } + + /// How many times an action has successfully resolved. + pub fn version(&self) -> RwSignal { + todo!() + } +} + +pub struct ArcMultiAction +where + I: 'static, + O: 'static, +{ + version: ArcRwSignal, + submissions: ArcRwSignal>>, + #[allow(clippy::complexity)] + action_fn: Arc Pin + Send>>>, +} + +impl ArcMultiAction +where + I: 'static, + O: 'static, +{ + pub fn new(action_fn: impl Fn(&I) -> Fut + 'static) -> Self + where + Fut: Future + Send + 'static, + { + let action_fn = Arc::new(move |input: &I| { + let fut = action_fn(input); + Box::pin(fut) as Pin + Send>> + }); + Self { + version: ArcRwSignal::new(0), + submissions: ArcRwSignal::new(Vec::new()), + action_fn, + } + } + + /// Calls the `async` function with a reference to the input type as its argument. + pub fn dispatch(&self, input: I) { + if !is_suppressing_resource_load() { + let fut = (self.action_fn)(&input); + + let submission = ArcSubmission { + input: ArcRwSignal::new(Some(input)), + value: ArcRwSignal::new(None), + pending: ArcRwSignal::new(true), + canceled: ArcRwSignal::new(false), + }; + + self.submissions + .try_update(|subs| subs.push(submission.clone())); + + let version = self.version.clone(); + + Executor::spawn_local(async move { + let new_value = fut.await; + let canceled = submission.canceled.get_untracked(); + if !canceled { + submission.value.try_set(Some(new_value)); + } + submission.input.try_set(None); + submission.pending.try_set(false); + version.try_update(|n| *n += 1); + }) + } + } + + /// The set of all submissions to this multi-action. + pub fn submissions(&self) -> ArcReadSignal>> { + self.submissions.read_only() + } + + /// How many times an action has successfully resolved. + pub fn version(&self) -> ArcRwSignal { + self.version.clone() + } +} + +/// An action that has been submitted by dispatching it to a [MultiAction](crate::MultiAction). +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ArcSubmission +where + I: 'static, + O: 'static, +{ + /// The current argument that was dispatched to the `async` function. + /// `Some` while we are waiting for it to resolve, `None` if it has resolved. + input: ArcRwSignal>, + /// The most recent return value of the `async` function. + value: ArcRwSignal>, + pending: ArcRwSignal, + /// Controls this submission has been canceled. + canceled: ArcRwSignal, +} + +impl ArcSubmission +where + I: 'static, + O: 'static, +{ + pub fn input(&self) -> ArcReadSignal> { + self.input.read_only() + } + + pub fn value(&self) -> ArcReadSignal> { + self.value.read_only() + } + + pub fn pending(&self) -> ArcReadSignal { + self.pending.read_only() + } + + pub fn canceled(&self) -> ArcReadSignal { + self.canceled.read_only() + } + + pub fn cancel(&self) { + self.canceled.try_set(true); + } +} + +impl Clone for ArcSubmission { + fn clone(&self) -> Self { + Self { + input: self.input.clone(), + value: self.value.clone(), + pending: self.pending.clone(), + canceled: self.canceled.clone(), + } + } +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Submission +where + I: 'static, + O: 'static, +{ + /// The current argument that was dispatched to the `async` function. + /// `Some` while we are waiting for it to resolve, `None` if it has resolved. + input: RwSignal>, + /// The most recent return value of the `async` function. + value: RwSignal>, + pending: RwSignal, + /// Controls this submission has been canceled. + canceled: RwSignal, +} + +impl From> for Submission +where + I: Send + Sync + 'static, + O: Send + Sync + 'static, +{ + fn from(value: ArcSubmission) -> Self { + let ArcSubmission { + input, + value, + pending, + canceled, + } = value; + Self { + input: input.into(), + value: value.into(), + pending: pending.into(), + canceled: canceled.into(), + } + } +} + +impl Submission +where + I: Send + Sync + 'static, + O: Send + Sync + 'static, +{ + pub fn input(&self) -> ReadSignal> { + self.input.read_only() + } + + pub fn value(&self) -> ReadSignal> { + self.value.read_only() + } + + pub fn pending(&self) -> ReadSignal { + self.pending.read_only() + } + + pub fn canceled(&self) -> ReadSignal { + self.canceled.read_only() + } + + pub fn cancel(&self) { + self.canceled.try_set(true); + } +} + +impl Clone for Submission { + fn clone(&self) -> Self { + *self + } +} + +impl Copy for Submission {} diff --git a/reactive_graph/src/lib.rs b/reactive_graph/src/lib.rs index bcae9c72d..a36d5dd4c 100644 --- a/reactive_graph/src/lib.rs +++ b/reactive_graph/src/lib.rs @@ -72,7 +72,7 @@ use futures::Stream; use std::{future::Future, pin::Pin}; -pub mod action; +pub mod actions; pub(crate) mod channel; pub mod computed; pub mod diagnostics;