diff --git a/src/iothread.cpp b/src/iothread.cpp index ec032bcee..5ec7f92e7 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -64,14 +64,13 @@ struct main_thread_request_t { main_thread_request_t(main_thread_request_t &&) = delete; }; -// Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue, -// at which point they can be deallocated. s_active_thread_count is also protected by the lock. -static pthread_mutex_t s_spawn_queue_lock = PTHREAD_MUTEX_INITIALIZER; -static std::queue s_request_queue; -static int s_active_thread_count; - -static pthread_mutex_t s_result_queue_lock = PTHREAD_MUTEX_INITIALIZER; -static std::queue s_result_queue; +// Spawn support. Requests are allocated and come in on request_queue and go out on result_queue +struct thread_data_t { + std::queue request_queue; + int thread_count = 0; +}; +static owning_lock s_spawn_requests; +static owning_lock> s_result_queue; // "Do on main thread" support. static pthread_mutex_t s_main_thread_performer_lock = @@ -103,24 +102,19 @@ static void iothread_init(void) { } } -static void add_to_queue(struct spawn_request_t req) { - ASSERT_IS_LOCKED(s_spawn_queue_lock); - s_request_queue.push(std::move(req)); -} - static bool dequeue_spawn_request(spawn_request_t *result) { - ASSERT_IS_LOCKED(s_spawn_queue_lock); - if (!s_request_queue.empty()) { - *result = std::move(s_request_queue.front()); - s_request_queue.pop(); + auto locker = s_spawn_requests.acquire(); + thread_data_t &td = locker.value; + if (!td.request_queue.empty()) { + *result = std::move(td.request_queue.front()); + td.request_queue.pop(); return true; } return false; } static void enqueue_thread_result(spawn_request_t req) { - scoped_lock locker(s_result_queue_lock); - s_result_queue.push(std::move(req)); + s_result_queue.acquire().value.push(std::move(req)); } static void *this_thread() { return (void *)(intptr_t)pthread_self(); } @@ -128,14 +122,11 @@ static void *this_thread() { return (void *)(intptr_t)pthread_self(); } /// The function that does thread work. static void *iothread_worker(void *unused) { UNUSED(unused); - scoped_lock locker(s_spawn_queue_lock); struct spawn_request_t req; while (dequeue_spawn_request(&req)) { debug(5, "pthread %p dequeued\n", this_thread()); - // Unlock the queue while we execute the request. - locker.unlock(); - // Perform the work. + // Perform the work req.handler(); // If there's a completion handler, we have to enqueue it on the result queue. @@ -146,21 +137,17 @@ static void *iothread_worker(void *unused) { const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte)); } - - // Lock us up again. - locker.lock(); } // We believe we have exhausted the thread request queue. We want to decrement - // s_active_thread_count and exit. But it's possible that a request just came in. Furthermore, - // it's possible that the main thread saw that s_active_thread_count is full, and decided to not + // thread_count and exit. But it's possible that a request just came in. Furthermore, + // it's possible that the main thread saw that thread_count is full, and decided to not // spawn a new thread, trusting in one of the existing threads to handle it. But we've already // committed to not handling anything else. Therefore, we have to decrement - // s_active_thread_count under the lock, which we still hold. Likewise, the main thread must + // the thread count under the lock, which we still hold. Likewise, the main thread must // check the value under the lock. - ASSERT_IS_LOCKED(s_spawn_queue_lock); - assert(s_active_thread_count > 0); - s_active_thread_count -= 1; + int new_thread_count = --s_spawn_requests.acquire().value.thread_count; + assert(new_thread_count >= 0); debug(5, "pthread %p exiting\n", this_thread()); // We're done. @@ -199,14 +186,14 @@ int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) bool spawn_new_thread = false; { // Lock around a local region. - // Note that we can only access s_active_thread_count under the lock. - scoped_lock locker(s_spawn_queue_lock); - add_to_queue(std::move(req)); - if (s_active_thread_count < IO_MAX_THREADS) { - s_active_thread_count++; + auto locker = s_spawn_requests.acquire(); + thread_data_t &td = locker.value; + td.request_queue.push(std::move(req)); + if (td.thread_count < IO_MAX_THREADS) { + td.thread_count++; spawn_new_thread = true; } - local_thread_count = s_active_thread_count; + local_thread_count = td.thread_count; } // Kick off the thread if we decided to do so. @@ -261,21 +248,17 @@ void iothread_drain_all(void) { ASSERT_IS_MAIN_THREAD(); ASSERT_IS_NOT_FORKED_CHILD(); - scoped_lock locker(s_spawn_queue_lock); - #define TIME_DRAIN 0 #if TIME_DRAIN - int thread_count = s_active_thread_count; + int thread_count = s_spawn_requests.acquire().value.thread_count; double now = timef(); #endif // Nasty polling via select(). - while (s_active_thread_count > 0) { - locker.unlock(); + while (s_spawn_requests.acquire().value.thread_count > 0) { if (iothread_wait_for_pending_completions(1000)) { iothread_service_completion(); } - locker.lock(); } #if TIME_DRAIN double after = timef(); @@ -323,12 +306,9 @@ static void iothread_service_main_thread_requests(void) { static void iothread_service_result_queue() { // Move the queue to a local variable. std::queue result_queue; - { - scoped_lock queue_lock(s_result_queue_lock); - result_queue.swap(s_result_queue); - } + s_result_queue.acquire().value.swap(result_queue); - // Perform each completion in order. We are responsibile for cleaning them up. + // Perform each completion in order while (!result_queue.empty()) { spawn_request_t req = std::move(result_queue.front()); result_queue.pop();