fix: ensure that we retain the correct sandoxed arena when spawning Futures (#2965)

This commit is contained in:
Greg Johnston 2024-09-14 09:10:01 -04:00 committed by GitHub
parent d7881ccfb5
commit 6166f6edbd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 21 additions and 11 deletions

View file

@ -235,7 +235,7 @@ where
self.input.try_update(|inp| *inp = Some(input)); self.input.try_update(|inp| *inp = Some(input));
// Spawn the task // Spawn the task
Executor::spawn({ crate::spawn({
let input = self.input.clone(); let input = self.input.clone();
let version = self.version.clone(); let version = self.version.clone();
let value = self.value.clone(); let value = self.value.clone();

View file

@ -5,7 +5,6 @@ use crate::{
traits::{DefinedAt, Dispose, GetUntracked, Set, Update}, traits::{DefinedAt, Dispose, GetUntracked, Set, Update},
unwrap_signal, unwrap_signal,
}; };
use any_spawner::Executor;
use std::{fmt::Debug, future::Future, panic::Location, pin::Pin, sync::Arc}; use std::{fmt::Debug, future::Future, panic::Location, pin::Pin, sync::Arc};
/// An action that synchronizes multiple imperative `async` calls to the reactive system, /// An action that synchronizes multiple imperative `async` calls to the reactive system,
@ -507,7 +506,7 @@ where
let version = self.version.clone(); let version = self.version.clone();
Executor::spawn(async move { crate::spawn(async move {
let new_value = fut.await; let new_value = fut.await;
let canceled = submission.canceled.get_untracked(); let canceled = submission.canceled.get_untracked();
if !canceled { if !canceled {

View file

@ -571,7 +571,7 @@ impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
if self.value.blocking_read().is_none() { if self.value.blocking_read().is_none() {
let handle = suspense_context.task_id(); let handle = suspense_context.task_id();
let ready = SpecialNonReactiveFuture::new(self.ready()); let ready = SpecialNonReactiveFuture::new(self.ready());
Executor::spawn(async move { crate::spawn(async move {
ready.await; ready.await;
drop(handle); drop(handle);
}); });

View file

@ -358,7 +358,7 @@ impl Effect<SyncStorage> {
let mut first_run = true; let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>)); let value = Arc::new(RwLock::new(None::<T>));
Executor::spawn({ crate::spawn({
let value = Arc::clone(&value); let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber(); let subscriber = inner.to_any_subscriber();
@ -403,7 +403,7 @@ impl Effect<SyncStorage> {
let mut first_run = true; let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>)); let value = Arc::new(RwLock::new(None::<T>));
Executor::spawn({ let task = {
let value = Arc::clone(&value); let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber(); let subscriber = inner.to_any_subscriber();
@ -425,7 +425,9 @@ impl Effect<SyncStorage> {
} }
} }
} }
}); };
crate::spawn(task);
Self { Self {
inner: Some(StoredValue::new_with_storage(Some(inner))), inner: Some(StoredValue::new_with_storage(Some(inner))),
@ -451,7 +453,7 @@ impl Effect<SyncStorage> {
let watch_value = Arc::new(RwLock::new(None::<T>)); let watch_value = Arc::new(RwLock::new(None::<T>));
let inner = cfg!(feature = "effects").then(|| { let inner = cfg!(feature = "effects").then(|| {
Executor::spawn({ crate::spawn({
let dep_value = Arc::clone(&dep_value); let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value); let watch_value = Arc::clone(&watch_value);
let subscriber = inner.to_any_subscriber(); let subscriber = inner.to_any_subscriber();

View file

@ -153,7 +153,7 @@ where
.with(|| inner.to_any_subscriber().with_observer(|| fun(None))); .with(|| inner.to_any_subscriber().with_observer(|| fun(None)));
*value.write().or_poisoned() = Some(initial_value); *value.write().or_poisoned() = Some(initial_value);
Executor::spawn({ crate::spawn({
let value = Arc::clone(&value); let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber(); let subscriber = inner.to_any_subscriber();

View file

@ -71,7 +71,7 @@
#![cfg_attr(feature = "nightly", feature(fn_traits))] #![cfg_attr(feature = "nightly", feature(fn_traits))]
#![deny(missing_docs)] #![deny(missing_docs)]
use std::fmt::Arguments; use std::{fmt::Arguments, future::Future};
pub mod actions; pub mod actions;
pub(crate) mod channel; pub(crate) mod channel;
@ -120,3 +120,12 @@ fn log_warning(text: Arguments) {
eprintln!("{}", text); eprintln!("{}", text);
} }
} }
/// Calls [`Executor::spawn`], but ensures that the task also runs in the current arena, if
/// multithreaded arena sandboxing is enabled.
pub(crate) fn spawn(task: impl Future<Output = ()> + Send + 'static) {
#[cfg(feature = "sandboxed-arenas")]
let task = owner::Sandboxed::new(task);
any_spawner::Executor::spawn(task);
}

View file

@ -587,7 +587,7 @@ where
fn from_stream(stream: impl Stream<Item = T> + Send + 'static) -> Self { fn from_stream(stream: impl Stream<Item = T> + Send + 'static) -> Self {
let (read, write) = arc_signal(None); let (read, write) = arc_signal(None);
let mut stream = Box::pin(stream); let mut stream = Box::pin(stream);
Executor::spawn(async move { crate::spawn(async move {
while let Some(value) = stream.next().await { while let Some(value) = stream.next().await {
write.set(Some(value)); write.set(Some(value));
} }