Use vectors, not queues, in iothread main thread requests

queues use std::deque under the hood which is more expensive than a vector.
We always consume the entire queue so there is no advantage to use deque here.
Just use a vector.
This commit is contained in:
ridiculousfish 2021-02-06 15:43:00 -08:00
parent 76833cf6af
commit aac5862a67

View file

@ -140,12 +140,12 @@ struct thread_pool_t {
/// Leaked to avoid shutdown dtor registration (including tsan).
static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS));
static owning_lock<std::queue<void_function_t>> s_result_queue;
static owning_lock<std::vector<void_function_t>> s_result_queue;
// "Do on main thread" support.
// The queue of main thread requests. This queue contains pointers to structs that are
// stack-allocated on the requesting thread.
static owning_lock<std::queue<main_thread_request_t *>> s_main_thread_request_queue;
static owning_lock<std::vector<main_thread_request_t *>> s_main_thread_request_queue;
// Pipes used for notifying.
struct notify_pipes_t {
@ -198,7 +198,7 @@ maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
}
static void enqueue_thread_result(void_function_t req) {
s_result_queue.acquire()->push(std::move(req));
s_result_queue.acquire()->push_back(std::move(req));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
@ -369,14 +369,12 @@ static void iothread_service_main_thread_requests() {
ASSERT_IS_MAIN_THREAD();
// Move the queue to a local variable.
std::queue<main_thread_request_t *> request_queue;
std::vector<main_thread_request_t *> request_queue;
s_main_thread_request_queue.acquire()->swap(request_queue);
// Perform each of the functions. Note we are NOT responsible for deleting these. They are
// stack allocated in their respective threads!
while (!request_queue.empty()) {
main_thread_request_t *req = request_queue.front();
request_queue.pop();
for (main_thread_request_t *req : request_queue) {
req->func();
req->done.set_value();
}
@ -385,17 +383,13 @@ static void iothread_service_main_thread_requests() {
// Service the queue of results
static void iothread_service_result_queue() {
// Move the queue to a local variable.
std::queue<void_function_t> result_queue;
std::vector<void_function_t> result_queue;
s_result_queue.acquire()->swap(result_queue);
// Perform each completion in order
while (!result_queue.empty()) {
void_function_t req(std::move(result_queue.front()));
result_queue.pop();
for (const auto &func : result_queue) {
// ensure we don't invoke empty functions, that raises an exception
if (req != nullptr) {
req();
}
if (func) func();
}
}
@ -409,7 +403,7 @@ void iothread_perform_on_main(void_function_t &&func) {
main_thread_request_t req(std::move(func));
// Append it. Ensure we don't hold the lock after.
s_main_thread_request_queue.acquire()->push(&req);
s_main_thread_request_queue.acquire()->push_back(&req);
// Tell the pipe and then wait until our future is set.
const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;