From e2782ac3227ebb3b9d8ba8c943927c57106596c4 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sun, 19 Jun 2022 14:19:17 -0700 Subject: [PATCH] Remove iothread_perform_on_main iothread_perform_on_main is deadlock-prone under concurrent execution. We no longer use it, so remove it. --- src/fish_tests.cpp | 34 ++++++++++++---------------- src/iothread.cpp | 56 +++++----------------------------------------- src/iothread.h | 3 --- 3 files changed, 19 insertions(+), 74 deletions(-) diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index bc76e25c0..943c2434c 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -788,19 +788,6 @@ static void test_tokenizer() { err(L"redirection_type_for_string failed on line %ld", (long)__LINE__); } -// Little function that runs in a background thread, bouncing to the main. -static int test_iothread_thread_call(std::atomic *addr) { - int before = *addr; - iothread_perform_on_main([=]() { *addr += 1; }); - int after = *addr; - - // Must have incremented it at least once. - if (before >= after) { - err(L"Failed to increment from background thread"); - } - return after; -} - static void test_fd_monitor() { say(L"Testing fd_monitor"); @@ -935,17 +922,24 @@ static void test_fd_monitor() { static void test_iothread() { say(L"Testing iothreads"); - std::unique_ptr> int_ptr = make_unique>(0); - int iterations = 64; + std::atomic shared_int{0}; + const int iterations = 64; + std::promise prom; for (int i = 0; i < iterations; i++) { - iothread_perform([&]() { test_iothread_thread_call(int_ptr.get()); }); + iothread_perform([&] { + int newv = 1 + shared_int.fetch_add(1, std::memory_order_relaxed); + if (newv == iterations) { + prom.set_value(); + } + }); } - iothread_drain_all(); + auto status = prom.get_future().wait_for(std::chrono::seconds(64)); // Should have incremented it once per thread. - do_test(*int_ptr == iterations); - if (*int_ptr != iterations) { - say(L"Expected int to be %d, but instead it was %d", iterations, int_ptr->load()); + do_test(status == std::future_status::ready); + do_test(shared_int == iterations); + if (shared_int != iterations) { + say(L"Expected int to be %d, but instead it was %d", iterations, shared_int.load()); } } diff --git a/src/iothread.cpp b/src/iothread.cpp index 1ba046627..be6f0253e 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -107,28 +107,7 @@ struct thread_pool_t : noncopyable_t, nonmovable_t { static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS)); /// A queue of "things to do on the main thread." -struct main_thread_queue_t : noncopyable_t { - // Functions to invoke as the completion callback from debounce. - std::vector completions; - - // iothread_perform_on_main requests. - // Note this contains pointers to structs that are stack-allocated on the requesting thread. - std::vector requests; - - /// Transfer ownership of ourselves to a new queue and return it. - /// 'this' is left empty. - main_thread_queue_t take() { - main_thread_queue_t result; - std::swap(result.completions, this->completions); - std::swap(result.requests, this->requests); - return result; - } - - // Moving is allowed, but not copying. - main_thread_queue_t() = default; - main_thread_queue_t(main_thread_queue_t &&) = default; - main_thread_queue_t &operator=(main_thread_queue_t &&) = default; -}; +using main_thread_queue_t = std::vector; static owning_lock s_main_thread_queue; /// \return the signaller for completions and main thread requests. @@ -300,39 +279,14 @@ void iothread_service_main() { // Move the queue to a local variable. // Note the s_main_thread_queue lock is not held after this. - main_thread_queue_t queue = s_main_thread_queue.acquire()->take(); + main_thread_queue_t queue; + s_main_thread_queue.acquire()->swap(queue); // Perform each completion in order. - for (const void_function_t &func : queue.completions) { + for (const void_function_t &func : queue) { // ensure we don't invoke empty functions, that raises an exception if (func) func(); } - - // Perform each main thread request. - for (const void_function_t &func : queue.requests) { - if (func) func(); - } -} - -void iothread_perform_on_main(const void_function_t &func) { - if (is_main_thread()) { - func(); - return; - } - - // Make a new request. Note we are synchronous, so our closure can use references instead of - // copying. - std::promise wait_until_done; - auto handler = [&] { - func(); - wait_until_done.set_value(); - }; - // Append it. Ensure we don't hold the lock after. - s_main_thread_queue.acquire()->requests.emplace_back(std::move(handler)); - - // Tell the signaller and then wait until our future is set. - get_notify_signaller().post(); - wait_until_done.get_future().wait(); } bool make_detached_pthread(void *(*func)(void *), void *param) { @@ -501,7 +455,7 @@ uint64_t debounce_t::perform(std::function handler) { // static void debounce_t::enqueue_main_thread_result(std::function func) { - s_main_thread_queue.acquire()->completions.push_back(std::move(func)); + s_main_thread_queue.acquire()->push_back(std::move(func)); get_notify_signaller().post(); } diff --git a/src/iothread.h b/src/iothread.h index 1012ae7a1..cbe9abda8 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -40,9 +40,6 @@ inline void iothread_perform_cantwait(std::function &&func) { iothread_perform_impl(std::move(func), true); } -/// Performs a function on the main thread, blocking until it completes. -void iothread_perform_on_main(const std::function &func); - /// Creates a pthread, manipulating the signal mask so that the thread receives no signals. /// The thread is detached. /// The pthread runs \p func.