2012-02-28 02:43:24 +00:00
|
|
|
#include "config.h"
|
2011-12-27 05:21:12 +00:00
|
|
|
#include "iothread.h"
|
2012-02-28 02:43:24 +00:00
|
|
|
#include "common.h"
|
2011-12-27 05:21:12 +00:00
|
|
|
#include <pthread.h>
|
|
|
|
#include <assert.h>
|
|
|
|
#include <errno.h>
|
|
|
|
#include <stdio.h>
|
|
|
|
#include <string.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <unistd.h>
|
2012-01-23 05:40:08 +00:00
|
|
|
#include <signal.h>
|
2012-03-01 01:55:50 +00:00
|
|
|
#include <fcntl.h>
|
2012-02-28 03:46:15 +00:00
|
|
|
#include <queue>
|
2011-12-27 05:21:12 +00:00
|
|
|
|
|
|
|
#ifdef _POSIX_THREAD_THREADS_MAX
|
|
|
|
#if _POSIX_THREAD_THREADS_MAX < 64
|
|
|
|
#define IO_MAX_THREADS _POSIX_THREAD_THREADS_MAX
|
|
|
|
#endif
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#ifndef IO_MAX_THREADS
|
|
|
|
#define IO_MAX_THREADS 64
|
|
|
|
#endif
|
|
|
|
|
|
|
|
static int s_active_thread_count;
|
|
|
|
|
|
|
|
typedef unsigned char ThreadIndex_t;
|
|
|
|
|
|
|
|
static struct WorkerThread_t {
|
|
|
|
ThreadIndex_t idx;
|
|
|
|
pthread_t thread;
|
|
|
|
} threads[IO_MAX_THREADS];
|
|
|
|
|
|
|
|
struct ThreadedRequest_t {
|
|
|
|
int sequenceNumber;
|
|
|
|
|
|
|
|
int (*handler)(void *);
|
|
|
|
void (*completionCallback)(void *, int);
|
|
|
|
void *context;
|
|
|
|
int handlerResult;
|
|
|
|
};
|
|
|
|
|
|
|
|
static struct WorkerThread_t *next_vacant_thread_slot(void) {
|
|
|
|
for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++) {
|
|
|
|
if (! threads[i].thread) return &threads[i];
|
|
|
|
}
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
static pthread_mutex_t s_request_queue_lock;
|
2012-02-28 03:46:15 +00:00
|
|
|
static std::queue<ThreadedRequest_t *> s_request_queue;
|
2011-12-27 05:21:12 +00:00
|
|
|
static int s_last_sequence_number;
|
|
|
|
static int s_read_pipe, s_write_pipe;
|
|
|
|
|
|
|
|
static void iothread_init(void) {
|
2012-02-28 03:46:15 +00:00
|
|
|
static bool inited = false;
|
2011-12-27 05:21:12 +00:00
|
|
|
if (! inited) {
|
2012-02-28 03:46:15 +00:00
|
|
|
inited = true;
|
2011-12-27 05:21:12 +00:00
|
|
|
|
|
|
|
/* Initialize the queue lock */
|
|
|
|
VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL));
|
|
|
|
|
|
|
|
/* Initialize the completion pipes */
|
|
|
|
int pipes[2] = {0, 0};
|
|
|
|
VOMIT_ON_FAILURE(pipe(pipes));
|
|
|
|
s_read_pipe = pipes[0];
|
|
|
|
s_write_pipe = pipes[1];
|
2012-03-01 01:55:50 +00:00
|
|
|
|
|
|
|
// 0 means success to VOMIT_ON_FAILURE. Arrange to pass 0 if fcntl returns anything other than -1.
|
|
|
|
VOMIT_ON_FAILURE(-1 == fcntl(s_read_pipe, F_SETFD, FD_CLOEXEC));
|
|
|
|
VOMIT_ON_FAILURE(-1 == fcntl(s_write_pipe, F_SETFD, FD_CLOEXEC));
|
2011-12-27 05:21:12 +00:00
|
|
|
|
|
|
|
/* Tell each thread its index */
|
|
|
|
for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++) {
|
|
|
|
threads[i].idx = i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void add_to_queue(struct ThreadedRequest_t *req) {
|
2012-02-28 03:46:15 +00:00
|
|
|
ASSERT_IS_LOCKED(s_request_queue_lock);
|
|
|
|
s_request_queue.push(req);
|
|
|
|
}
|
|
|
|
|
|
|
|
static ThreadedRequest_t *dequeue_request(void) {
|
|
|
|
ThreadedRequest_t *result = NULL;
|
|
|
|
scoped_lock lock(s_request_queue_lock);
|
|
|
|
if (! s_request_queue.empty()) {
|
|
|
|
result = s_request_queue.front();
|
|
|
|
s_request_queue.pop();
|
|
|
|
}
|
|
|
|
return result;
|
2011-12-27 05:21:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* The function that does thread work. */
|
|
|
|
static void *iothread_worker(void *threadPtr) {
|
|
|
|
assert(threadPtr != NULL);
|
|
|
|
struct WorkerThread_t *thread = (struct WorkerThread_t *)threadPtr;
|
|
|
|
|
2012-01-23 05:40:08 +00:00
|
|
|
// We don't want to receive signals on this thread
|
|
|
|
sigset_t set;
|
|
|
|
sigfillset(&set);
|
|
|
|
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &set, NULL));
|
|
|
|
|
2011-12-27 05:21:12 +00:00
|
|
|
/* Grab a request off of the queue */
|
2012-02-28 03:46:15 +00:00
|
|
|
struct ThreadedRequest_t *req = dequeue_request();
|
2011-12-27 05:21:12 +00:00
|
|
|
|
|
|
|
/* Run the handler and store the result */
|
|
|
|
if (req) {
|
|
|
|
req->handlerResult = req->handler(req->context);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Write our index to wake up the main thread */
|
2012-03-01 01:55:50 +00:00
|
|
|
VOMIT_ON_FAILURE(! write_loop(s_write_pipe, (const char *)&thread->idx, sizeof thread->idx));
|
2011-12-27 05:21:12 +00:00
|
|
|
|
|
|
|
/* We're done */
|
|
|
|
return req;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Spawn another thread if there's work to be done. */
|
|
|
|
static void iothread_spawn_if_needed(void) {
|
2012-02-28 03:46:15 +00:00
|
|
|
ASSERT_IS_LOCKED(s_request_queue_lock);
|
|
|
|
if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS) {
|
2011-12-27 05:21:12 +00:00
|
|
|
struct WorkerThread_t *thread = next_vacant_thread_slot();
|
|
|
|
assert(thread != NULL);
|
|
|
|
|
|
|
|
/* Spawn a thread */
|
|
|
|
int err;
|
|
|
|
do {
|
|
|
|
err = 0;
|
|
|
|
if (pthread_create(&thread->thread, NULL, iothread_worker, thread)) {
|
|
|
|
err = errno;
|
|
|
|
}
|
|
|
|
} while (err == EAGAIN);
|
|
|
|
assert(err == 0);
|
|
|
|
|
|
|
|
/* Note that we are spawned another thread */
|
|
|
|
s_active_thread_count += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-02-15 19:33:41 +00:00
|
|
|
int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int), void *context) {
|
2012-02-28 02:43:24 +00:00
|
|
|
ASSERT_IS_MAIN_THREAD();
|
|
|
|
ASSERT_IS_NOT_FORKED_CHILD();
|
2011-12-27 05:21:12 +00:00
|
|
|
iothread_init();
|
|
|
|
|
|
|
|
/* Create and initialize a request. */
|
2012-02-17 22:54:58 +00:00
|
|
|
struct ThreadedRequest_t *req = new ThreadedRequest_t();
|
2011-12-27 05:21:12 +00:00
|
|
|
req->handler = handler;
|
|
|
|
req->completionCallback = completionCallback;
|
|
|
|
req->context = context;
|
|
|
|
req->sequenceNumber = ++s_last_sequence_number;
|
2012-02-28 03:46:15 +00:00
|
|
|
|
|
|
|
/* Take our lock */
|
|
|
|
scoped_lock lock(s_request_queue_lock);
|
|
|
|
|
|
|
|
/* Add to the queue */
|
|
|
|
add_to_queue(req);
|
2011-12-27 05:21:12 +00:00
|
|
|
|
2012-02-28 03:46:15 +00:00
|
|
|
/* Spawn a thread if necessary */
|
|
|
|
iothread_spawn_if_needed();
|
2011-12-27 05:21:12 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
int iothread_port(void) {
|
|
|
|
iothread_init();
|
|
|
|
return s_read_pipe;
|
|
|
|
}
|
|
|
|
|
|
|
|
void iothread_service_completion(void) {
|
2012-02-28 03:46:15 +00:00
|
|
|
ASSERT_IS_MAIN_THREAD();
|
2011-12-27 05:21:12 +00:00
|
|
|
ThreadIndex_t threadIdx = (ThreadIndex_t)-1;
|
2012-03-01 01:55:50 +00:00
|
|
|
VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &threadIdx, sizeof threadIdx));
|
2011-12-27 05:21:12 +00:00
|
|
|
assert(threadIdx < IO_MAX_THREADS);
|
|
|
|
|
|
|
|
struct WorkerThread_t *thread = &threads[threadIdx];
|
|
|
|
assert(thread->thread != 0);
|
|
|
|
|
|
|
|
struct ThreadedRequest_t *req = NULL;
|
|
|
|
VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req));
|
|
|
|
|
|
|
|
/* Free up this thread */
|
|
|
|
thread->thread = 0;
|
|
|
|
assert(s_active_thread_count > 0);
|
|
|
|
s_active_thread_count -= 1;
|
|
|
|
|
|
|
|
/* Handle the request */
|
2012-02-17 22:54:58 +00:00
|
|
|
if (req) {
|
|
|
|
if (req->completionCallback)
|
|
|
|
req->completionCallback(req->context, req->handlerResult);
|
|
|
|
delete req;
|
|
|
|
}
|
|
|
|
|
2011-12-27 05:21:12 +00:00
|
|
|
/* Maybe spawn another thread, if there's more work to be done. */
|
|
|
|
VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
|
|
|
|
iothread_spawn_if_needed();
|
|
|
|
VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
|
|
|
|
}
|
2012-02-28 03:46:15 +00:00
|
|
|
|
|
|
|
void iothread_drain_all(void) {
|
|
|
|
ASSERT_IS_MAIN_THREAD();
|
2012-03-01 01:55:50 +00:00
|
|
|
ASSERT_IS_NOT_FORKED_CHILD();
|
2012-02-28 03:46:15 +00:00
|
|
|
while (s_active_thread_count > 0) {
|
|
|
|
iothread_service_completion();
|
|
|
|
}
|
|
|
|
}
|