diff --git a/fish-rust/src/threads.rs b/fish-rust/src/threads.rs index 82a5e4c75..b5d4620e1 100644 --- a/fish-rust/src/threads.rs +++ b/fish-rust/src/threads.rs @@ -133,12 +133,20 @@ fn iothread_perform_cant_wait_ffi(callback: &cxx::SharedPtr) { /// A [`ThreadPool`] or [`Debounce`] work request. type WorkItem = Box; -/// The queue of [`WorkItem`]s to be executed on the main thread. This is added to from -/// [`Debounce::enqueue_main_thread()`] and read from in `iothread_service_main()`. +// A helper type to allow us to (temporarily) send an object to another thread. +struct ForceSend(T); + +// Safety: only used on main thread. +unsafe impl Send for ForceSend {} + +#[allow(clippy::type_complexity)] +type DebounceCallback = ForceSend>; + +/// The queue of [`WorkItem`]s to be executed on the main thread. This is read from in +/// `iothread_service_main()`. /// -/// Since items are enqueued from various background threads then read by the main thread, the work -/// items must implement `Send`. -static MAIN_THREAD_QUEUE: Mutex> = Mutex::new(Vec::new()); +/// Since the queue is synchronized, items don't need to implement `Send`. +static MAIN_THREAD_QUEUE: Mutex> = Mutex::new(Vec::new()); /// Initialize some global static variables. Must be called at startup from the main thread. pub fn init() { @@ -566,12 +574,11 @@ pub fn iothread_service_main() { // posting uses the opposite order. NOTIFY_SIGNALLER.try_consume(); - // Move the queue to a local variable. The MAIN_THREAD_QUEUE lock is not held after this. let queue = std::mem::take(&mut *MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!")); // Perform each completion in order. - for func in queue { - (func)(); + for callback in queue { + (callback.0)(); } } @@ -666,8 +673,7 @@ impl Debounce { /// /// The result is a token which is only of interest to the test suite. pub fn perform(&self, handler: impl FnOnce() + 'static + Send) -> NonZeroU64 { - let h = Box::new(handler); - self.perform_inner(h) + self.perform_with_completion(handler, |_result| ()) } fn perform_ffi(&self, callback: &cxx::SharedPtr) -> u64 { @@ -707,24 +713,28 @@ impl Debounce { pub fn perform_with_completion(&self, handler: H, completion: C) -> NonZeroU64 where H: FnOnce() -> R + 'static + Send, - C: FnOnce(R) + 'static + Send, + C: FnOnce(R) + 'static, R: 'static + Send, { - let h = Box::new(move || { + assert_is_main_thread(); + let completion_wrapper = ForceSend(completion); + let work_item = Box::new(move || { let result = handler(); - let c = Box::new(move || { - (completion)(result); - }); - Self::enqueue_main_thread_result(c); + let callback: DebounceCallback = ForceSend(Box::new(move || { + let completion = completion_wrapper; + (completion.0)(result); + })); + MAIN_THREAD_QUEUE.lock().unwrap().push(callback); + NOTIFY_SIGNALLER.post(); }); - self.perform_inner(h) + self.perform_inner(work_item) } - fn perform_inner(&self, handler: WorkItem) -> NonZeroU64 { + fn perform_inner(&self, work_item: WorkItem) -> NonZeroU64 { let mut spawn = false; let active_token = { let mut data = self.data.lock().expect("Mutex poisoned!"); - data.next_req = Some(handler); + data.next_req = Some(work_item); // If we have a timeout and our running thread has exceeded it, abandon that thread. if data.active_token.is_some() && !self.timeout.is_zero() @@ -757,12 +767,6 @@ impl Debounce { active_token } - - /// Static helper to add a [`WorkItem`] to [`MAIN_THREAD_ID`] and signal [`NOTIFY_SIGNALLER`]. - fn enqueue_main_thread_result(f: WorkItem) { - MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!").push(f); - NOTIFY_SIGNALLER.post(); - } } #[test]