diff --git a/src/iothread.cpp b/src/iothread.cpp index 6ce2fec9a..3e862e354 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -140,12 +140,12 @@ struct thread_pool_t { /// Leaked to avoid shutdown dtor registration (including tsan). static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS)); -static owning_lock> s_result_queue; +static owning_lock> s_result_queue; // "Do on main thread" support. // The queue of main thread requests. This queue contains pointers to structs that are // stack-allocated on the requesting thread. -static owning_lock> s_main_thread_request_queue; +static owning_lock> s_main_thread_request_queue; // Pipes used for notifying. struct notify_pipes_t { @@ -198,7 +198,7 @@ maybe_t thread_pool_t::dequeue_work_or_commit_to_exit() { } static void enqueue_thread_result(void_function_t req) { - s_result_queue.acquire()->push(std::move(req)); + s_result_queue.acquire()->push_back(std::move(req)); const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; int notify_fd = get_notify_pipes().write; assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); @@ -369,14 +369,12 @@ static void iothread_service_main_thread_requests() { ASSERT_IS_MAIN_THREAD(); // Move the queue to a local variable. - std::queue request_queue; + std::vector request_queue; s_main_thread_request_queue.acquire()->swap(request_queue); // Perform each of the functions. Note we are NOT responsible for deleting these. They are // stack allocated in their respective threads! - while (!request_queue.empty()) { - main_thread_request_t *req = request_queue.front(); - request_queue.pop(); + for (main_thread_request_t *req : request_queue) { req->func(); req->done.set_value(); } @@ -385,17 +383,13 @@ static void iothread_service_main_thread_requests() { // Service the queue of results static void iothread_service_result_queue() { // Move the queue to a local variable. - std::queue result_queue; + std::vector result_queue; s_result_queue.acquire()->swap(result_queue); // Perform each completion in order - while (!result_queue.empty()) { - void_function_t req(std::move(result_queue.front())); - result_queue.pop(); + for (const auto &func : result_queue) { // ensure we don't invoke empty functions, that raises an exception - if (req != nullptr) { - req(); - } + if (func) func(); } } @@ -409,7 +403,7 @@ void iothread_perform_on_main(void_function_t &&func) { main_thread_request_t req(std::move(func)); // Append it. Ensure we don't hold the lock after. - s_main_thread_request_queue.acquire()->push(&req); + s_main_thread_request_queue.acquire()->push_back(&req); // Tell the pipe and then wait until our future is set. const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;