Stop requiring Send from debounce completion callbacks

Today, debounce-style work items are only created from the main thread.
The work to compute the result is done in a background thread but the
completion callback is called on the main thread again.

The completion callbacks used by the reader capture a shared reference to
ReaderData, which includes a Parser.  Neither of those types needs to be
sent across threads.

The debounce machinery moves the completion callback into a function object
that is moved to the background thread and back again.  Because of this
there is a Send requirement on the completion callback.

Since we already synchronize on MAIN_THREAD_QUEUE, we don't need Send from
the function object. Lift the requirement.
This commit is contained in:
Johannes Altmanninger 2023-12-10 18:09:13 +01:00
parent 3c12864436
commit d8ac0508f8

View file

@ -133,12 +133,20 @@ fn iothread_perform_cant_wait_ffi(callback: &cxx::SharedPtr<ffi::CppCallback>) {
/// A [`ThreadPool`] or [`Debounce`] work request.
type WorkItem = Box<dyn FnOnce() + 'static + Send>;
/// The queue of [`WorkItem`]s to be executed on the main thread. This is added to from
/// [`Debounce::enqueue_main_thread()`] and read from in `iothread_service_main()`.
// A helper type to allow us to (temporarily) send an object to another thread.
struct ForceSend<T>(T);
// Safety: only used on main thread.
unsafe impl<T> Send for ForceSend<T> {}
#[allow(clippy::type_complexity)]
type DebounceCallback = ForceSend<Box<dyn FnOnce() + 'static>>;
/// The queue of [`WorkItem`]s to be executed on the main thread. This is read from in
/// `iothread_service_main()`.
///
/// Since items are enqueued from various background threads then read by the main thread, the work
/// items must implement `Send`.
static MAIN_THREAD_QUEUE: Mutex<Vec<WorkItem>> = Mutex::new(Vec::new());
/// Since the queue is synchronized, items don't need to implement `Send`.
static MAIN_THREAD_QUEUE: Mutex<Vec<DebounceCallback>> = Mutex::new(Vec::new());
/// Initialize some global static variables. Must be called at startup from the main thread.
pub fn init() {
@ -566,12 +574,11 @@ pub fn iothread_service_main() {
// posting uses the opposite order.
NOTIFY_SIGNALLER.try_consume();
// Move the queue to a local variable. The MAIN_THREAD_QUEUE lock is not held after this.
let queue = std::mem::take(&mut *MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!"));
// Perform each completion in order.
for func in queue {
(func)();
for callback in queue {
(callback.0)();
}
}
@ -666,8 +673,7 @@ impl Debounce {
///
/// The result is a token which is only of interest to the test suite.
pub fn perform(&self, handler: impl FnOnce() + 'static + Send) -> NonZeroU64 {
let h = Box::new(handler);
self.perform_inner(h)
self.perform_with_completion(handler, |_result| ())
}
fn perform_ffi(&self, callback: &cxx::SharedPtr<ffi::CppCallback>) -> u64 {
@ -707,24 +713,28 @@ impl Debounce {
pub fn perform_with_completion<H, R, C>(&self, handler: H, completion: C) -> NonZeroU64
where
H: FnOnce() -> R + 'static + Send,
C: FnOnce(R) + 'static + Send,
C: FnOnce(R) + 'static,
R: 'static + Send,
{
let h = Box::new(move || {
assert_is_main_thread();
let completion_wrapper = ForceSend(completion);
let work_item = Box::new(move || {
let result = handler();
let c = Box::new(move || {
(completion)(result);
let callback: DebounceCallback = ForceSend(Box::new(move || {
let completion = completion_wrapper;
(completion.0)(result);
}));
MAIN_THREAD_QUEUE.lock().unwrap().push(callback);
NOTIFY_SIGNALLER.post();
});
Self::enqueue_main_thread_result(c);
});
self.perform_inner(h)
self.perform_inner(work_item)
}
fn perform_inner(&self, handler: WorkItem) -> NonZeroU64 {
fn perform_inner(&self, work_item: WorkItem) -> NonZeroU64 {
let mut spawn = false;
let active_token = {
let mut data = self.data.lock().expect("Mutex poisoned!");
data.next_req = Some(handler);
data.next_req = Some(work_item);
// If we have a timeout and our running thread has exceeded it, abandon that thread.
if data.active_token.is_some()
&& !self.timeout.is_zero()
@ -757,12 +767,6 @@ impl Debounce {
active_token
}
/// Static helper to add a [`WorkItem`] to [`MAIN_THREAD_ID`] and signal [`NOTIFY_SIGNALLER`].
fn enqueue_main_thread_result(f: WorkItem) {
MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!").push(f);
NOTIFY_SIGNALLER.post();
}
}
#[test]