Remove iothread_perform_on_main

iothread_perform_on_main is deadlock-prone under concurrent execution.
We no longer use it, so remove it.
This commit is contained in:
ridiculousfish 2022-06-19 14:19:17 -07:00
parent bfa83470d4
commit e2782ac322
3 changed files with 19 additions and 74 deletions

View file

@ -788,19 +788,6 @@ static void test_tokenizer() {
err(L"redirection_type_for_string failed on line %ld", (long)__LINE__);
}
// Little function that runs in a background thread, bouncing to the main.
static int test_iothread_thread_call(std::atomic<int> *addr) {
int before = *addr;
iothread_perform_on_main([=]() { *addr += 1; });
int after = *addr;
// Must have incremented it at least once.
if (before >= after) {
err(L"Failed to increment from background thread");
}
return after;
}
static void test_fd_monitor() {
say(L"Testing fd_monitor");
@ -935,17 +922,24 @@ static void test_fd_monitor() {
static void test_iothread() {
say(L"Testing iothreads");
std::unique_ptr<std::atomic<int>> int_ptr = make_unique<std::atomic<int>>(0);
int iterations = 64;
std::atomic<int> shared_int{0};
const int iterations = 64;
std::promise<void> prom;
for (int i = 0; i < iterations; i++) {
iothread_perform([&]() { test_iothread_thread_call(int_ptr.get()); });
iothread_perform([&] {
int newv = 1 + shared_int.fetch_add(1, std::memory_order_relaxed);
if (newv == iterations) {
prom.set_value();
}
});
}
iothread_drain_all();
auto status = prom.get_future().wait_for(std::chrono::seconds(64));
// Should have incremented it once per thread.
do_test(*int_ptr == iterations);
if (*int_ptr != iterations) {
say(L"Expected int to be %d, but instead it was %d", iterations, int_ptr->load());
do_test(status == std::future_status::ready);
do_test(shared_int == iterations);
if (shared_int != iterations) {
say(L"Expected int to be %d, but instead it was %d", iterations, shared_int.load());
}
}

View file

@ -107,28 +107,7 @@ struct thread_pool_t : noncopyable_t, nonmovable_t {
static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS));
/// A queue of "things to do on the main thread."
struct main_thread_queue_t : noncopyable_t {
// Functions to invoke as the completion callback from debounce.
std::vector<void_function_t> completions;
// iothread_perform_on_main requests.
// Note this contains pointers to structs that are stack-allocated on the requesting thread.
std::vector<void_function_t> requests;
/// Transfer ownership of ourselves to a new queue and return it.
/// 'this' is left empty.
main_thread_queue_t take() {
main_thread_queue_t result;
std::swap(result.completions, this->completions);
std::swap(result.requests, this->requests);
return result;
}
// Moving is allowed, but not copying.
main_thread_queue_t() = default;
main_thread_queue_t(main_thread_queue_t &&) = default;
main_thread_queue_t &operator=(main_thread_queue_t &&) = default;
};
using main_thread_queue_t = std::vector<void_function_t>;
static owning_lock<main_thread_queue_t> s_main_thread_queue;
/// \return the signaller for completions and main thread requests.
@ -300,39 +279,14 @@ void iothread_service_main() {
// Move the queue to a local variable.
// Note the s_main_thread_queue lock is not held after this.
main_thread_queue_t queue = s_main_thread_queue.acquire()->take();
main_thread_queue_t queue;
s_main_thread_queue.acquire()->swap(queue);
// Perform each completion in order.
for (const void_function_t &func : queue.completions) {
for (const void_function_t &func : queue) {
// ensure we don't invoke empty functions, that raises an exception
if (func) func();
}
// Perform each main thread request.
for (const void_function_t &func : queue.requests) {
if (func) func();
}
}
void iothread_perform_on_main(const void_function_t &func) {
if (is_main_thread()) {
func();
return;
}
// Make a new request. Note we are synchronous, so our closure can use references instead of
// copying.
std::promise<void> wait_until_done;
auto handler = [&] {
func();
wait_until_done.set_value();
};
// Append it. Ensure we don't hold the lock after.
s_main_thread_queue.acquire()->requests.emplace_back(std::move(handler));
// Tell the signaller and then wait until our future is set.
get_notify_signaller().post();
wait_until_done.get_future().wait();
}
bool make_detached_pthread(void *(*func)(void *), void *param) {
@ -501,7 +455,7 @@ uint64_t debounce_t::perform(std::function<void()> handler) {
// static
void debounce_t::enqueue_main_thread_result(std::function<void()> func) {
s_main_thread_queue.acquire()->completions.push_back(std::move(func));
s_main_thread_queue.acquire()->push_back(std::move(func));
get_notify_signaller().post();
}

View file

@ -40,9 +40,6 @@ inline void iothread_perform_cantwait(std::function<void()> &&func) {
iothread_perform_impl(std::move(func), true);
}
/// Performs a function on the main thread, blocking until it completes.
void iothread_perform_on_main(const std::function<void()> &func);
/// Creates a pthread, manipulating the signal mask so that the thread receives no signals.
/// The thread is detached.
/// The pthread runs \p func.