Adopt owning_lock in iothread.cpp

This commit is contained in:
ridiculousfish 2017-01-29 21:06:46 -08:00
parent 0df65a106d
commit 4ac2cfba61

View file

@ -64,14 +64,13 @@ struct main_thread_request_t {
main_thread_request_t(main_thread_request_t &&) = delete; main_thread_request_t(main_thread_request_t &&) = delete;
}; };
// Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue, // Spawn support. Requests are allocated and come in on request_queue and go out on result_queue
// at which point they can be deallocated. s_active_thread_count is also protected by the lock. struct thread_data_t {
static pthread_mutex_t s_spawn_queue_lock = PTHREAD_MUTEX_INITIALIZER; std::queue<spawn_request_t> request_queue;
static std::queue<spawn_request_t> s_request_queue; int thread_count = 0;
static int s_active_thread_count; };
static owning_lock<thread_data_t> s_spawn_requests;
static pthread_mutex_t s_result_queue_lock = PTHREAD_MUTEX_INITIALIZER; static owning_lock<std::queue<spawn_request_t>> s_result_queue;
static std::queue<spawn_request_t> s_result_queue;
// "Do on main thread" support. // "Do on main thread" support.
static pthread_mutex_t s_main_thread_performer_lock = static pthread_mutex_t s_main_thread_performer_lock =
@ -103,24 +102,19 @@ static void iothread_init(void) {
} }
} }
static void add_to_queue(struct spawn_request_t req) {
ASSERT_IS_LOCKED(s_spawn_queue_lock);
s_request_queue.push(std::move(req));
}
static bool dequeue_spawn_request(spawn_request_t *result) { static bool dequeue_spawn_request(spawn_request_t *result) {
ASSERT_IS_LOCKED(s_spawn_queue_lock); auto locker = s_spawn_requests.acquire();
if (!s_request_queue.empty()) { thread_data_t &td = locker.value;
*result = std::move(s_request_queue.front()); if (!td.request_queue.empty()) {
s_request_queue.pop(); *result = std::move(td.request_queue.front());
td.request_queue.pop();
return true; return true;
} }
return false; return false;
} }
static void enqueue_thread_result(spawn_request_t req) { static void enqueue_thread_result(spawn_request_t req) {
scoped_lock locker(s_result_queue_lock); s_result_queue.acquire().value.push(std::move(req));
s_result_queue.push(std::move(req));
} }
static void *this_thread() { return (void *)(intptr_t)pthread_self(); } static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
@ -128,14 +122,11 @@ static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
/// The function that does thread work. /// The function that does thread work.
static void *iothread_worker(void *unused) { static void *iothread_worker(void *unused) {
UNUSED(unused); UNUSED(unused);
scoped_lock locker(s_spawn_queue_lock);
struct spawn_request_t req; struct spawn_request_t req;
while (dequeue_spawn_request(&req)) { while (dequeue_spawn_request(&req)) {
debug(5, "pthread %p dequeued\n", this_thread()); debug(5, "pthread %p dequeued\n", this_thread());
// Unlock the queue while we execute the request.
locker.unlock();
// Perform the work. // Perform the work
req.handler(); req.handler();
// If there's a completion handler, we have to enqueue it on the result queue. // If there's a completion handler, we have to enqueue it on the result queue.
@ -146,21 +137,17 @@ static void *iothread_worker(void *unused) {
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte)); VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
} }
// Lock us up again.
locker.lock();
} }
// We believe we have exhausted the thread request queue. We want to decrement // We believe we have exhausted the thread request queue. We want to decrement
// s_active_thread_count and exit. But it's possible that a request just came in. Furthermore, // thread_count and exit. But it's possible that a request just came in. Furthermore,
// it's possible that the main thread saw that s_active_thread_count is full, and decided to not // it's possible that the main thread saw that thread_count is full, and decided to not
// spawn a new thread, trusting in one of the existing threads to handle it. But we've already // spawn a new thread, trusting in one of the existing threads to handle it. But we've already
// committed to not handling anything else. Therefore, we have to decrement // committed to not handling anything else. Therefore, we have to decrement
// s_active_thread_count under the lock, which we still hold. Likewise, the main thread must // the thread count under the lock, which we still hold. Likewise, the main thread must
// check the value under the lock. // check the value under the lock.
ASSERT_IS_LOCKED(s_spawn_queue_lock); int new_thread_count = --s_spawn_requests.acquire().value.thread_count;
assert(s_active_thread_count > 0); assert(new_thread_count >= 0);
s_active_thread_count -= 1;
debug(5, "pthread %p exiting\n", this_thread()); debug(5, "pthread %p exiting\n", this_thread());
// We're done. // We're done.
@ -199,14 +186,14 @@ int iothread_perform_impl(void_function_t &&func, void_function_t &&completion)
bool spawn_new_thread = false; bool spawn_new_thread = false;
{ {
// Lock around a local region. // Lock around a local region.
// Note that we can only access s_active_thread_count under the lock. auto locker = s_spawn_requests.acquire();
scoped_lock locker(s_spawn_queue_lock); thread_data_t &td = locker.value;
add_to_queue(std::move(req)); td.request_queue.push(std::move(req));
if (s_active_thread_count < IO_MAX_THREADS) { if (td.thread_count < IO_MAX_THREADS) {
s_active_thread_count++; td.thread_count++;
spawn_new_thread = true; spawn_new_thread = true;
} }
local_thread_count = s_active_thread_count; local_thread_count = td.thread_count;
} }
// Kick off the thread if we decided to do so. // Kick off the thread if we decided to do so.
@ -261,21 +248,17 @@ void iothread_drain_all(void) {
ASSERT_IS_MAIN_THREAD(); ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD(); ASSERT_IS_NOT_FORKED_CHILD();
scoped_lock locker(s_spawn_queue_lock);
#define TIME_DRAIN 0 #define TIME_DRAIN 0
#if TIME_DRAIN #if TIME_DRAIN
int thread_count = s_active_thread_count; int thread_count = s_spawn_requests.acquire().value.thread_count;
double now = timef(); double now = timef();
#endif #endif
// Nasty polling via select(). // Nasty polling via select().
while (s_active_thread_count > 0) { while (s_spawn_requests.acquire().value.thread_count > 0) {
locker.unlock();
if (iothread_wait_for_pending_completions(1000)) { if (iothread_wait_for_pending_completions(1000)) {
iothread_service_completion(); iothread_service_completion();
} }
locker.lock();
} }
#if TIME_DRAIN #if TIME_DRAIN
double after = timef(); double after = timef();
@ -323,12 +306,9 @@ static void iothread_service_main_thread_requests(void) {
static void iothread_service_result_queue() { static void iothread_service_result_queue() {
// Move the queue to a local variable. // Move the queue to a local variable.
std::queue<spawn_request_t> result_queue; std::queue<spawn_request_t> result_queue;
{ s_result_queue.acquire().value.swap(result_queue);
scoped_lock queue_lock(s_result_queue_lock);
result_queue.swap(s_result_queue);
}
// Perform each completion in order. We are responsibile for cleaning them up. // Perform each completion in order
while (!result_queue.empty()) { while (!result_queue.empty()) {
spawn_request_t req = std::move(result_queue.front()); spawn_request_t req = std::move(result_queue.front());
result_queue.pop(); result_queue.pop();