From 216f7d912ab8e93eaa9c131293e114ac459acba4 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 23 Jan 2017 09:56:02 -0800 Subject: [PATCH] Clean up some of the memory management in iothread.cpp Store requests directly on the queue, instead of via a heap allocation --- src/iothread.cpp | 110 +++++++++++++++++++++++++---------------------- src/iothread.h | 4 +- 2 files changed, 60 insertions(+), 54 deletions(-) diff --git a/src/iothread.cpp b/src/iothread.cpp index acbad20ab..56ef5416b 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -33,32 +33,48 @@ static void iothread_service_main_thread_requests(void); static void iothread_service_result_queue(); struct spawn_request_t { - int (*handler)(void *); - void (*completionCallback)(void *, int); - void *context; - int handlerResult; + int (*handler)(void *) = NULL; + void (*completion)(void *, int) = NULL; + void *context = NULL; + int handler_result = -1; + + spawn_request_t() {} + + // Move-only + spawn_request_t &operator=(const spawn_request_t &) = delete; + spawn_request_t &operator=(spawn_request_t &&) = default; + spawn_request_t(const spawn_request_t &) = delete; + spawn_request_t(spawn_request_t &&) = default; }; struct main_thread_request_t { - int (*handler)(void *); - void *context; - volatile int handlerResult; - volatile bool done; + int (*handler)(void *) = NULL; + void *context = NULL; + volatile int handler_result = -1; + volatile bool done = false; + + main_thread_request_t() {} + + // No moving OR copying + // main_thread_requests are always stack allocated, and we deal in pointers to them + void operator=(const spawn_request_t &) = delete; + main_thread_request_t(const spawn_request_t &) = delete; + main_thread_request_t(spawn_request_t &&) = delete; }; // Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue, // at which point they can be deallocated. s_active_thread_count is also protected by the lock. -static pthread_mutex_t s_spawn_queue_lock; -static std::queue s_request_queue; +static pthread_mutex_t s_spawn_queue_lock = PTHREAD_MUTEX_INITIALIZER; +static std::queue s_request_queue; static int s_active_thread_count; -static pthread_mutex_t s_result_queue_lock; -static std::queue s_result_queue; +static pthread_mutex_t s_result_queue_lock = PTHREAD_MUTEX_INITIALIZER; +static std::queue s_result_queue; // "Do on main thread" support. -static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests +static pthread_mutex_t s_main_thread_performer_lock = PTHREAD_MUTEX_INITIALIZER; // protects the main thread requests static pthread_cond_t s_main_thread_performer_cond; // protects the main thread requests -static pthread_mutex_t s_main_thread_request_q_lock; // protects the queue +static pthread_mutex_t s_main_thread_request_q_lock = PTHREAD_MUTEX_INITIALIZER; // protects the queue static std::queue s_main_thread_request_queue; // Notifying pipes. @@ -70,10 +86,6 @@ static void iothread_init(void) { inited = true; // Initialize some locks. - VOMIT_ON_FAILURE(pthread_mutex_init(&s_spawn_queue_lock, NULL)); - VOMIT_ON_FAILURE(pthread_mutex_init(&s_result_queue_lock, NULL)); - VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_request_q_lock, NULL)); - VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_performer_lock, NULL)); VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_cond, NULL)); // Initialize the completion pipes. @@ -89,24 +101,24 @@ static void iothread_init(void) { } } -static void add_to_queue(struct spawn_request_t *req) { +static void add_to_queue(struct spawn_request_t req) { ASSERT_IS_LOCKED(s_spawn_queue_lock); - s_request_queue.push(req); + s_request_queue.push(std::move(req)); } -static spawn_request_t *dequeue_spawn_request(void) { +static bool dequeue_spawn_request(spawn_request_t *result) { ASSERT_IS_LOCKED(s_spawn_queue_lock); - spawn_request_t *result = NULL; if (!s_request_queue.empty()) { - result = s_request_queue.front(); + *result = std::move(s_request_queue.front()); s_request_queue.pop(); + return true; } - return result; + return false; } -static void enqueue_thread_result(spawn_request_t *req) { +static void enqueue_thread_result(spawn_request_t req) { scoped_lock locker(s_result_queue_lock); - s_result_queue.push(req); + s_result_queue.push(std::move(req)); } static void *this_thread() { return (void *)(intptr_t)pthread_self(); } @@ -115,22 +127,19 @@ static void *this_thread() { return (void *)(intptr_t)pthread_self(); } static void *iothread_worker(void *unused) { UNUSED(unused); scoped_lock locker(s_spawn_queue_lock); - struct spawn_request_t *req; - while ((req = dequeue_spawn_request()) != NULL) { - debug(5, "pthread %p dequeued %p\n", this_thread(), req); + struct spawn_request_t req; + while (dequeue_spawn_request(&req)) { + debug(5, "pthread %p dequeued\n", this_thread()); // Unlock the queue while we execute the request. locker.unlock(); // Perform the work. - req->handlerResult = req->handler(req->context); + req.handler_result = req.handler(req.context); - // If there's a completion handler, we have to enqueue it on the result queue. Otherwise, we - // can just delete the request! - if (req->completionCallback == NULL) { - delete req; - } else { + // If there's a completion handler, we have to enqueue it on the result queue. + if (req.completion != NULL) { // Enqueue the result, and tell the main thread about it. - enqueue_thread_result(req); + enqueue_thread_result(std::move(req)); const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte)); } @@ -177,17 +186,17 @@ static void iothread_spawn() { VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL)); } -int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int), +int iothread_perform_base(int (*handler)(void *), void (*completion)(void *, int), void *context) { ASSERT_IS_MAIN_THREAD(); ASSERT_IS_NOT_FORKED_CHILD(); iothread_init(); // Create and initialize a request. - struct spawn_request_t *req = new spawn_request_t(); - req->handler = handler; - req->completionCallback = completionCallback; - req->context = context; + struct spawn_request_t req; + req.handler = handler; + req.completion = completion; + req.context = context; int local_thread_count = -1; bool spawn_new_thread = false; @@ -195,7 +204,7 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi // Lock around a local region. Note that we can only access s_active_thread_count under the // lock. scoped_lock locker(s_spawn_queue_lock); - add_to_queue(req); + add_to_queue(std::move(req)); if (s_active_thread_count < IO_MAX_THREADS) { s_active_thread_count++; spawn_new_thread = true; @@ -297,7 +306,7 @@ static void iothread_service_main_thread_requests(void) { while (!request_queue.empty()) { main_thread_request_t *req = request_queue.front(); request_queue.pop(); - req->handlerResult = req->handler(req->context); + req->handler_result = req->handler(req->context); req->done = true; } @@ -315,10 +324,10 @@ static void iothread_service_main_thread_requests(void) { } } -/* Service the queue of results */ +// Service the queue of results static void iothread_service_result_queue() { // Move the queue to a local variable. - std::queue result_queue; + std::queue result_queue; { scoped_lock queue_lock(s_result_queue_lock); std::swap(result_queue, s_result_queue); @@ -326,12 +335,11 @@ static void iothread_service_result_queue() { // Perform each completion in order. We are responsibile for cleaning them up. while (!result_queue.empty()) { - spawn_request_t *req = result_queue.front(); + spawn_request_t req = std::move(result_queue.front()); result_queue.pop(); - if (req->completionCallback) { - req->completionCallback(req->context, req->handlerResult); + if (req.completion) { + req.completion(req.context, req.handler_result); } - delete req; } } @@ -345,8 +353,6 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context) { main_thread_request_t req; req.handler = handler; req.context = context; - req.handlerResult = 0; - req.done = false; // Append it. Do not delete the nested scope as it is crucial to the proper functioning of this // code by virtue of the lock management. @@ -370,5 +376,5 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context) { // Ok, the request must now be done. assert(req.done); - return req.handlerResult; + return req.handler_result; } diff --git a/src/iothread.h b/src/iothread.h index ec57be683..b384c8da6 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -29,9 +29,9 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context); /// Helper templates. template -int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int), T *context) { +int iothread_perform(int (*handler)(T *), void (*completion)(T *, int), T *context) { return iothread_perform_base((int (*)(void *))handler, - (void (*)(void *, int))completionCallback, + (void (*)(void *, int))completion, static_cast(context)); }