Change to wait for all outstanding iothreads before calling fork(). This should prevent a whole host of threading/fork interactions, but may also compromise performance...we'll see.

This commit is contained in:
ridiculousfish 2012-02-27 19:46:15 -08:00
parent fdfa5c0602
commit cf54ad8242
3 changed files with 44 additions and 38 deletions

View file

@ -34,6 +34,7 @@
#include "fallback.h"
#include "util.h"
#include "iothread.h"
#include "common.h"
#include "wutil.h"
@ -833,6 +834,9 @@ static pid_t exec_fork()
{
ASSERT_IS_MAIN_THREAD();
/* Make sure we have no outstanding threads before we fork. This is a pretty sketchy thing to do here, both because exec.cpp shouldn't have to know about iothreads, and because the completion handlers may do unexpected things. */
iothread_drain_all();
pid_t pid;
struct timespec pollint;
int i;

View file

@ -9,6 +9,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <queue>
#define VOMIT_ON_FAILURE(a) do { if (0 != (a)) { int err = errno; fprintf(stderr, "%s failed on line %d in file %s: %d (%s)\n", #a, __LINE__, __FILE__, err, strerror(err)); abort(); }} while (0)
@ -33,7 +34,6 @@ static struct WorkerThread_t {
} threads[IO_MAX_THREADS];
struct ThreadedRequest_t {
struct ThreadedRequest_t *next;
int sequenceNumber;
int (*handler)(void *);
@ -50,14 +50,14 @@ static struct WorkerThread_t *next_vacant_thread_slot(void) {
}
static pthread_mutex_t s_request_queue_lock;
static struct ThreadedRequest_t *s_request_queue_head;
static std::queue<ThreadedRequest_t *> s_request_queue;
static int s_last_sequence_number;
static int s_read_pipe, s_write_pipe;
static void iothread_init(void) {
static int inited = 0;
static bool inited = false;
if (! inited) {
inited = 1;
inited = true;
/* Initialize the queue lock */
VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL));
@ -76,16 +76,18 @@ static void iothread_init(void) {
}
static void add_to_queue(struct ThreadedRequest_t *req) {
//requires that the queue lock be held
if (s_request_queue_head == NULL) {
s_request_queue_head = req;
} else {
struct ThreadedRequest_t *last_in_queue = s_request_queue_head;
while (last_in_queue->next != NULL) {
last_in_queue = last_in_queue->next;
}
last_in_queue->next = req;
ASSERT_IS_LOCKED(s_request_queue_lock);
s_request_queue.push(req);
}
static ThreadedRequest_t *dequeue_request(void) {
ThreadedRequest_t *result = NULL;
scoped_lock lock(s_request_queue_lock);
if (! s_request_queue.empty()) {
result = s_request_queue.front();
s_request_queue.pop();
}
return result;
}
/* The function that does thread work. */
@ -99,13 +101,7 @@ static void *iothread_worker(void *threadPtr) {
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &set, NULL));
/* Grab a request off of the queue */
struct ThreadedRequest_t *req;
VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
req = s_request_queue_head;
if (req) {
s_request_queue_head = req->next;
}
VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
struct ThreadedRequest_t *req = dequeue_request();
/* Run the handler and store the result */
if (req) {
@ -121,7 +117,8 @@ static void *iothread_worker(void *threadPtr) {
/* Spawn another thread if there's work to be done. */
static void iothread_spawn_if_needed(void) {
if (s_request_queue_head != NULL && s_active_thread_count < IO_MAX_THREADS) {
ASSERT_IS_LOCKED(s_request_queue_lock);
if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS) {
struct WorkerThread_t *thread = next_vacant_thread_slot();
assert(thread != NULL);
@ -147,24 +144,19 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
/* Create and initialize a request. */
struct ThreadedRequest_t *req = new ThreadedRequest_t();
req->next = NULL;
req->handler = handler;
req->completionCallback = completionCallback;
req->context = context;
req->sequenceNumber = ++s_last_sequence_number;
/* Take the queue lock */
VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
/* Take our lock */
scoped_lock lock(s_request_queue_lock);
/* Add to the queue */
add_to_queue(req);
/* Spawn a thread if necessary */
iothread_spawn_if_needed();
/* Unlock */
VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
return 0;
}
@ -174,6 +166,7 @@ int iothread_port(void) {
}
void iothread_service_completion(void) {
ASSERT_IS_MAIN_THREAD();
ThreadIndex_t threadIdx = (ThreadIndex_t)-1;
VOMIT_ON_FAILURE(! read(iothread_port(), &threadIdx, sizeof threadIdx));
assert(threadIdx < IO_MAX_THREADS);
@ -201,3 +194,11 @@ void iothread_service_completion(void) {
iothread_spawn_if_needed();
VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
}
void iothread_drain_all(void) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
while (s_active_thread_count > 0) {
iothread_service_completion();
}
}

View file

@ -22,11 +22,12 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
*/
int iothread_port(void);
/**
Services one iothread competion callback.
*/
/** Services one iothread competion callback. */
void iothread_service_completion(void);
/** Cancels all outstanding requests and waits for all iothreads to terminate. */
void iothread_drain_all(void);
/** Helper template */
template<typename T>
int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int), T *context) {