Use futures in perform_on_main_thread

Replace the complicated implementation which shared a condition variable, with
one which just uses std::future<void>. This may allocate more condition
variables but is much simpler.
This commit is contained in:
ridiculousfish 2021-02-06 15:36:21 -08:00
parent ae1c53cc19
commit 76833cf6af

View file

@ -15,6 +15,7 @@
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <functional> #include <functional>
#include <future>
#include <queue> #include <queue>
#include <thread> #include <thread>
@ -57,14 +58,18 @@ struct work_request_t {
}; };
struct main_thread_request_t { struct main_thread_request_t {
relaxed_atomic_bool_t done{false}; // The function to execute.
void_function_t func; void_function_t func;
// Set by the main thread when the work is done.
std::promise<void> done{};
explicit main_thread_request_t(void_function_t &&f) : func(f) {} explicit main_thread_request_t(void_function_t &&f) : func(f) {}
// No moving OR copying // No moving OR copying
// main_thread_requests are always stack allocated, and we deal in pointers to them // main_thread_requests are always stack allocated, and we deal in pointers to them
void operator=(const main_thread_request_t &) = delete; void operator=(const main_thread_request_t &) = delete;
void operator=(main_thread_request_t &&) = delete;
main_thread_request_t(const main_thread_request_t &) = delete; main_thread_request_t(const main_thread_request_t &) = delete;
main_thread_request_t(main_thread_request_t &&) = delete; main_thread_request_t(main_thread_request_t &&) = delete;
}; };
@ -138,11 +143,8 @@ static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS))
static owning_lock<std::queue<void_function_t>> s_result_queue; static owning_lock<std::queue<void_function_t>> s_result_queue;
// "Do on main thread" support. // "Do on main thread" support.
static std::mutex s_main_thread_performer_lock; // protects the main thread requests // The queue of main thread requests. This queue contains pointers to structs that are
static std::condition_variable s_main_thread_performer_cond; // protects the main thread requests // stack-allocated on the requesting thread.
/// The queue of main thread requests. This queue contains pointers to structs that are
/// stack-allocated on the requesting thread.
static owning_lock<std::queue<main_thread_request_t *>> s_main_thread_request_queue; static owning_lock<std::queue<main_thread_request_t *>> s_main_thread_request_queue;
// Pipes used for notifying. // Pipes used for notifying.
@ -370,28 +372,13 @@ static void iothread_service_main_thread_requests() {
std::queue<main_thread_request_t *> request_queue; std::queue<main_thread_request_t *> request_queue;
s_main_thread_request_queue.acquire()->swap(request_queue); s_main_thread_request_queue.acquire()->swap(request_queue);
if (!request_queue.empty()) { // Perform each of the functions. Note we are NOT responsible for deleting these. They are
// Perform each of the functions. Note we are NOT responsible for deleting these. They are // stack allocated in their respective threads!
// stack allocated in their respective threads! while (!request_queue.empty()) {
while (!request_queue.empty()) { main_thread_request_t *req = request_queue.front();
main_thread_request_t *req = request_queue.front(); request_queue.pop();
request_queue.pop(); req->func();
req->func(); req->done.set_value();
req->done = true;
}
// Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked.
// Note we must do this while holding the lock. Otherwise we race with the waiting threads:
//
// 1. waiting thread checks for done, sees false
// 2. main thread performs request, sets done to true, posts to condition
// 3. waiting thread unlocks lock, waits on condition (forever)
//
// Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid
// posting before the waiting thread is waiting.
// TODO: revisit this logic, this feels sketchy.
scoped_lock broadcast_lock(s_main_thread_performer_lock);
s_main_thread_performer_cond.notify_all();
} }
} }
@ -424,21 +411,11 @@ void iothread_perform_on_main(void_function_t &&func) {
// Append it. Ensure we don't hold the lock after. // Append it. Ensure we don't hold the lock after.
s_main_thread_request_queue.acquire()->push(&req); s_main_thread_request_queue.acquire()->push(&req);
// Tell the pipe. // Tell the pipe and then wait until our future is set.
const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE; const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
int notify_fd = get_notify_pipes().write; int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
req.done.get_future().wait();
// Wait on the condition, until we're done.
std::unique_lock<std::mutex> perform_lock(s_main_thread_performer_lock);
while (!req.done) {
// It would be nice to support checking for cancellation here, but the clients need a
// deterministic way to clean up to avoid leaks
s_main_thread_performer_cond.wait(perform_lock);
}
// Ok, the request must now be done.
assert(req.done);
} }
bool make_detached_pthread(void *(*func)(void *), void *param) { bool make_detached_pthread(void *(*func)(void *), void *param) {