diff --git a/exec.cpp b/exec.cpp index 85a3696a5..78d64695b 100644 --- a/exec.cpp +++ b/exec.cpp @@ -34,6 +34,7 @@ #include "fallback.h" #include "util.h" +#include "iothread.h" #include "common.h" #include "wutil.h" @@ -833,6 +834,9 @@ static pid_t exec_fork() { ASSERT_IS_MAIN_THREAD(); + /* Make sure we have no outstanding threads before we fork. This is a pretty sketchy thing to do here, both because exec.cpp shouldn't have to know about iothreads, and because the completion handlers may do unexpected things. */ + iothread_drain_all(); + pid_t pid; struct timespec pollint; int i; diff --git a/iothread.cpp b/iothread.cpp index b98dd8f24..3aed2aadb 100644 --- a/iothread.cpp +++ b/iothread.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #define VOMIT_ON_FAILURE(a) do { if (0 != (a)) { int err = errno; fprintf(stderr, "%s failed on line %d in file %s: %d (%s)\n", #a, __LINE__, __FILE__, err, strerror(err)); abort(); }} while (0) @@ -33,7 +34,6 @@ static struct WorkerThread_t { } threads[IO_MAX_THREADS]; struct ThreadedRequest_t { - struct ThreadedRequest_t *next; int sequenceNumber; int (*handler)(void *); @@ -50,14 +50,14 @@ static struct WorkerThread_t *next_vacant_thread_slot(void) { } static pthread_mutex_t s_request_queue_lock; -static struct ThreadedRequest_t *s_request_queue_head; +static std::queue s_request_queue; static int s_last_sequence_number; static int s_read_pipe, s_write_pipe; static void iothread_init(void) { - static int inited = 0; + static bool inited = false; if (! inited) { - inited = 1; + inited = true; /* Initialize the queue lock */ VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL)); @@ -76,16 +76,18 @@ static void iothread_init(void) { } static void add_to_queue(struct ThreadedRequest_t *req) { - //requires that the queue lock be held - if (s_request_queue_head == NULL) { - s_request_queue_head = req; - } else { - struct ThreadedRequest_t *last_in_queue = s_request_queue_head; - while (last_in_queue->next != NULL) { - last_in_queue = last_in_queue->next; - } - last_in_queue->next = req; - } + ASSERT_IS_LOCKED(s_request_queue_lock); + s_request_queue.push(req); +} + +static ThreadedRequest_t *dequeue_request(void) { + ThreadedRequest_t *result = NULL; + scoped_lock lock(s_request_queue_lock); + if (! s_request_queue.empty()) { + result = s_request_queue.front(); + s_request_queue.pop(); + } + return result; } /* The function that does thread work. */ @@ -99,13 +101,7 @@ static void *iothread_worker(void *threadPtr) { VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &set, NULL)); /* Grab a request off of the queue */ - struct ThreadedRequest_t *req; - VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock)); - req = s_request_queue_head; - if (req) { - s_request_queue_head = req->next; - } - VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); + struct ThreadedRequest_t *req = dequeue_request(); /* Run the handler and store the result */ if (req) { @@ -121,7 +117,8 @@ static void *iothread_worker(void *threadPtr) { /* Spawn another thread if there's work to be done. */ static void iothread_spawn_if_needed(void) { - if (s_request_queue_head != NULL && s_active_thread_count < IO_MAX_THREADS) { + ASSERT_IS_LOCKED(s_request_queue_lock); + if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS) { struct WorkerThread_t *thread = next_vacant_thread_slot(); assert(thread != NULL); @@ -147,24 +144,19 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi /* Create and initialize a request. */ struct ThreadedRequest_t *req = new ThreadedRequest_t(); - req->next = NULL; req->handler = handler; req->completionCallback = completionCallback; req->context = context; req->sequenceNumber = ++s_last_sequence_number; - - /* Take the queue lock */ - VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock)); - - /* Add to the queue */ - add_to_queue(req); - - /* Spawn a thread if necessary */ - iothread_spawn_if_needed(); - - /* Unlock */ - VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); + + /* Take our lock */ + scoped_lock lock(s_request_queue_lock); + /* Add to the queue */ + add_to_queue(req); + + /* Spawn a thread if necessary */ + iothread_spawn_if_needed(); return 0; } @@ -174,6 +166,7 @@ int iothread_port(void) { } void iothread_service_completion(void) { + ASSERT_IS_MAIN_THREAD(); ThreadIndex_t threadIdx = (ThreadIndex_t)-1; VOMIT_ON_FAILURE(! read(iothread_port(), &threadIdx, sizeof threadIdx)); assert(threadIdx < IO_MAX_THREADS); @@ -201,3 +194,11 @@ void iothread_service_completion(void) { iothread_spawn_if_needed(); VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); } + +void iothread_drain_all(void) { + ASSERT_IS_MAIN_THREAD(); + ASSERT_IS_NOT_FORKED_CHILD(); + while (s_active_thread_count > 0) { + iothread_service_completion(); + } +} diff --git a/iothread.h b/iothread.h index 7619501ab..fe0f53006 100644 --- a/iothread.h +++ b/iothread.h @@ -22,11 +22,12 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi */ int iothread_port(void); -/** - Services one iothread competion callback. -*/ +/** Services one iothread competion callback. */ void iothread_service_completion(void); +/** Cancels all outstanding requests and waits for all iothreads to terminate. */ +void iothread_drain_all(void); + /** Helper template */ template int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int), T *context) {