Implemented iothread_perform_on_main() to support background threads

scheduling work on main thread
This commit is contained in:
ridiculousfish 2013-11-27 16:04:12 -08:00
parent e0b78f7f2a
commit 9f986d8a86
3 changed files with 189 additions and 40 deletions

View file

@ -489,6 +489,50 @@ static void test_fork(void)
#undef FORK_COUNT
}
// Little function that runs in the main thread
static int test_iothread_main_call(int *addr)
{
*addr += 1;
return *addr;
}
// Little function that runs in a background thread, bouncing to the main
static int test_iothread_thread_call(int *addr)
{
int before = *addr;
iothread_perform_on_main(test_iothread_main_call, addr);
int after = *addr;
// Must have incremented it at least once
if (before >= after)
{
err(L"Failed to increment from background thread");
}
return after;
}
static void test_iothread(void)
{
say(L"Testing iothreads");
int *int_ptr = new int(0);
int iterations = 1000;
for (int i=0; i < iterations; i++)
{
iothread_perform(test_iothread_thread_call, (void (*)(int *, int))NULL, int_ptr);
}
// Now wait until we're done
iothread_drain_all();
// Should have incremented it once per thread
if (*int_ptr != iterations)
{
say(L"Expected int to be %d, but instead it was %d", iterations, *int_ptr);
}
delete int_ptr;
}
/**
Test the parser
*/
@ -1878,6 +1922,7 @@ int main(int argc, char **argv)
test_convert_nulls();
test_tok();
test_fork();
test_iothread();
test_parser();
test_utils();
test_escape_sequences();

View file

@ -22,6 +22,11 @@
#define IO_MAX_THREADS 64
#endif
/* A special "thread index" that means service main thread requests */
#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99
static void iothread_service_main_thread_requests(void);
static int s_active_thread_count;
typedef unsigned char ThreadIndex_t;
@ -32,16 +37,22 @@ static struct WorkerThread_t
pthread_t thread;
} threads[IO_MAX_THREADS];
struct ThreadedRequest_t
struct SpawnRequest_t
{
int sequenceNumber;
int (*handler)(void *);
void (*completionCallback)(void *, int);
void *context;
int handlerResult;
};
struct MainThreadRequest_t
{
int (*handler)(void *);
void *context;
volatile int handlerResult;
volatile bool done;
};
static struct WorkerThread_t *next_vacant_thread_slot(void)
{
for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++)
@ -51,9 +62,17 @@ static struct WorkerThread_t *next_vacant_thread_slot(void)
return NULL;
}
static pthread_mutex_t s_request_queue_lock;
static std::queue<ThreadedRequest_t *> s_request_queue;
static int s_last_sequence_number;
/* Spawn support */
static pthread_mutex_t s_spawn_queue_lock;
static std::queue<SpawnRequest_t *> s_request_queue;
/* "Do on main thread" support */
static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests
static pthread_cond_t s_main_thread_performer_condition; //protects the main thread requests
static pthread_mutex_t s_main_thread_request_queue_lock; // protects the queue
static std::queue<MainThreadRequest_t *> s_main_thread_request_queue;
/* Notifying pipes */
static int s_read_pipe, s_write_pipe;
static void iothread_init(void)
@ -63,8 +82,11 @@ static void iothread_init(void)
{
inited = true;
/* Initialize the queue lock */
VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL));
/* Initialize some locks */
VOMIT_ON_FAILURE(pthread_mutex_init(&s_spawn_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_request_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_performer_lock, NULL));
VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_condition, NULL));
/* Initialize the completion pipes */
int pipes[2] = {0, 0};
@ -84,16 +106,16 @@ static void iothread_init(void)
}
}
static void add_to_queue(struct ThreadedRequest_t *req)
static void add_to_queue(struct SpawnRequest_t *req)
{
ASSERT_IS_LOCKED(s_request_queue_lock);
ASSERT_IS_LOCKED(s_spawn_queue_lock);
s_request_queue.push(req);
}
static ThreadedRequest_t *dequeue_request(void)
static SpawnRequest_t *dequeue_spawn_request(void)
{
ThreadedRequest_t *result = NULL;
scoped_lock lock(s_request_queue_lock);
SpawnRequest_t *result = NULL;
scoped_lock lock(s_spawn_queue_lock);
if (! s_request_queue.empty())
{
result = s_request_queue.front();
@ -109,7 +131,7 @@ static void *iothread_worker(void *threadPtr)
struct WorkerThread_t *thread = (struct WorkerThread_t *)threadPtr;
/* Grab a request off of the queue */
struct ThreadedRequest_t *req = dequeue_request();
struct SpawnRequest_t *req = dequeue_spawn_request();
/* Run the handler and store the result */
if (req)
@ -127,7 +149,7 @@ static void *iothread_worker(void *threadPtr)
/* Spawn another thread if there's work to be done. */
static void iothread_spawn_if_needed(void)
{
ASSERT_IS_LOCKED(s_request_queue_lock);
ASSERT_IS_LOCKED(s_spawn_queue_lock);
if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS)
{
struct WorkerThread_t *thread = next_vacant_thread_slot();
@ -168,14 +190,13 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
iothread_init();
/* Create and initialize a request. */
struct ThreadedRequest_t *req = new ThreadedRequest_t();
struct SpawnRequest_t *req = new SpawnRequest_t();
req->handler = handler;
req->completionCallback = completionCallback;
req->context = context;
req->sequenceNumber = ++s_last_sequence_number;
/* Take our lock */
scoped_lock lock(s_request_queue_lock);
scoped_lock lock(s_spawn_queue_lock);
/* Add to the queue */
add_to_queue(req);
@ -196,31 +217,38 @@ void iothread_service_completion(void)
ASSERT_IS_MAIN_THREAD();
ThreadIndex_t threadIdx = (ThreadIndex_t)-1;
VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &threadIdx, sizeof threadIdx));
assert(threadIdx < IO_MAX_THREADS);
struct WorkerThread_t *thread = &threads[threadIdx];
assert(thread->thread != 0);
struct ThreadedRequest_t *req = NULL;
VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req));
/* Free up this thread */
thread->thread = 0;
assert(s_active_thread_count > 0);
s_active_thread_count -= 1;
/* Handle the request */
if (req)
if (threadIdx == IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE)
{
if (req->completionCallback)
req->completionCallback(req->context, req->handlerResult);
delete req;
iothread_service_main_thread_requests();
}
else
{
assert(threadIdx < IO_MAX_THREADS);
/* Maybe spawn another thread, if there's more work to be done. */
VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
iothread_spawn_if_needed();
VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
struct WorkerThread_t *thread = &threads[threadIdx];
assert(thread->thread != 0);
struct SpawnRequest_t *req = NULL;
VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req));
/* Free up this thread */
thread->thread = 0;
assert(s_active_thread_count > 0);
s_active_thread_count -= 1;
/* Handle the request */
if (req)
{
if (req->completionCallback)
req->completionCallback(req->context, req->handlerResult);
delete req;
}
/* Maybe spawn another thread, if there's more work to be done. */
scoped_lock locker(s_spawn_queue_lock);
iothread_spawn_if_needed();
}
}
void iothread_drain_all(void)
@ -243,3 +271,68 @@ void iothread_drain_all(void)
printf("(Waited %.02f msec for %d thread(s) to drain)\n", 1000 * (after - now), thread_count);
#endif
}
/* "Do on main thread" support */
static void iothread_service_main_thread_requests(void)
{
ASSERT_IS_MAIN_THREAD();
// Move the queue to a local variable
std::queue<MainThreadRequest_t *> request_queue;
{
scoped_lock queue_lock(s_main_thread_request_queue_lock);
std::swap(request_queue, s_main_thread_request_queue);
}
if (! request_queue.empty())
{
// Perform each of the functions
// Note we are NOT responsible for deleting these. They are stack allocated in their respective threads!
scoped_lock cond_lock(s_main_thread_performer_lock);
while (! request_queue.empty())
{
MainThreadRequest_t *req = request_queue.front();
request_queue.pop();
req->handlerResult = req->handler(req->context);
req->done = true;
}
// Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked
VOMIT_ON_FAILURE(pthread_cond_broadcast(&s_main_thread_performer_condition));
}
}
int iothread_perform_on_main_base(int (*handler)(void *), void *context)
{
ASSERT_IS_BACKGROUND_THREAD();
// Make a new request. Note we are synchronous, so this can be stack allocated!
MainThreadRequest_t req;
req.handler = handler;
req.context = context;
req.handlerResult = 0;
req.done = false;
// Append it
{
scoped_lock queue_lock(s_main_thread_request_queue_lock);
s_main_thread_request_queue.push(&req);
}
// Tell the pipe
const ThreadIndex_t idx = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
VOMIT_ON_FAILURE(! write_loop(s_write_pipe, (const char *)&idx, sizeof idx));
// Wait on the condition, until we're done
scoped_lock perform_lock(s_main_thread_performer_lock);
while (! req.done)
{
// It would be nice to support checking for cancellation here, but the clients need a deterministic way to clean up to avoid leaks
VOMIT_ON_FAILURE(pthread_cond_wait(&s_main_thread_performer_condition, &s_main_thread_performer_lock));
}
// Ok, the request must now be done
assert(req.done);
return req.handlerResult;
}

View file

@ -28,11 +28,22 @@ void iothread_service_completion(void);
/** Waits for all iothreads to terminate. */
void iothread_drain_all(void);
/** Helper template */
/** Performs a function on the main thread, blocking until it completes */
int iothread_perform_on_main_base(int (*handler)(void *), void *context);
/** Helper templates */
template<typename T>
int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int), T *context)
{
return iothread_perform_base((int (*)(void *))handler, (void (*)(void *, int))completionCallback, static_cast<void *>(context));
}
/** Helper templates */
template<typename T>
int iothread_perform_on_main(int (*handler)(T *), T *context)
{
return iothread_perform_on_main_base((int (*)(void *))handler, static_cast<void *>(context));
}
#endif