2012-02-28 02:43:24 +00:00
# include "config.h"
2011-12-27 05:21:12 +00:00
# include "iothread.h"
2012-02-28 02:43:24 +00:00
# include "common.h"
2011-12-27 05:21:12 +00:00
# include <pthread.h>
# include <assert.h>
# include <errno.h>
# include <stdio.h>
# include <string.h>
# include <stdlib.h>
# include <unistd.h>
2012-01-23 05:40:08 +00:00
# include <signal.h>
2012-03-01 01:55:50 +00:00
# include <fcntl.h>
2012-02-28 03:46:15 +00:00
# include <queue>
2011-12-27 05:21:12 +00:00
2012-03-01 01:55:50 +00:00
# define VOMIT_ON_FAILURE(a) do { if (0 != (a)) { int err = errno; fprintf(stderr, "%s failed on line %d in file %s: %d (%s). Break on debug_thread_error to debug.\n", #a, __LINE__, __FILE__, err, strerror(err)); debug_thread_error(); abort(); }} while (0)
2011-12-27 05:21:12 +00:00
# ifdef _POSIX_THREAD_THREADS_MAX
# if _POSIX_THREAD_THREADS_MAX < 64
# define IO_MAX_THREADS _POSIX_THREAD_THREADS_MAX
# endif
# endif
# ifndef IO_MAX_THREADS
# define IO_MAX_THREADS 64
# endif
static int s_active_thread_count ;
typedef unsigned char ThreadIndex_t ;
static struct WorkerThread_t {
ThreadIndex_t idx ;
pthread_t thread ;
} threads [ IO_MAX_THREADS ] ;
struct ThreadedRequest_t {
int sequenceNumber ;
int ( * handler ) ( void * ) ;
void ( * completionCallback ) ( void * , int ) ;
void * context ;
int handlerResult ;
} ;
static struct WorkerThread_t * next_vacant_thread_slot ( void ) {
for ( ThreadIndex_t i = 0 ; i < IO_MAX_THREADS ; i + + ) {
if ( ! threads [ i ] . thread ) return & threads [ i ] ;
}
return NULL ;
}
static pthread_mutex_t s_request_queue_lock ;
2012-02-28 03:46:15 +00:00
static std : : queue < ThreadedRequest_t * > s_request_queue ;
2011-12-27 05:21:12 +00:00
static int s_last_sequence_number ;
static int s_read_pipe , s_write_pipe ;
static void iothread_init ( void ) {
2012-02-28 03:46:15 +00:00
static bool inited = false ;
2011-12-27 05:21:12 +00:00
if ( ! inited ) {
2012-02-28 03:46:15 +00:00
inited = true ;
2011-12-27 05:21:12 +00:00
/* Initialize the queue lock */
VOMIT_ON_FAILURE ( pthread_mutex_init ( & s_request_queue_lock , NULL ) ) ;
/* Initialize the completion pipes */
int pipes [ 2 ] = { 0 , 0 } ;
VOMIT_ON_FAILURE ( pipe ( pipes ) ) ;
s_read_pipe = pipes [ 0 ] ;
s_write_pipe = pipes [ 1 ] ;
2012-03-01 01:55:50 +00:00
// 0 means success to VOMIT_ON_FAILURE. Arrange to pass 0 if fcntl returns anything other than -1.
VOMIT_ON_FAILURE ( - 1 = = fcntl ( s_read_pipe , F_SETFD , FD_CLOEXEC ) ) ;
VOMIT_ON_FAILURE ( - 1 = = fcntl ( s_write_pipe , F_SETFD , FD_CLOEXEC ) ) ;
2011-12-27 05:21:12 +00:00
/* Tell each thread its index */
for ( ThreadIndex_t i = 0 ; i < IO_MAX_THREADS ; i + + ) {
threads [ i ] . idx = i ;
}
}
}
static void add_to_queue ( struct ThreadedRequest_t * req ) {
2012-02-28 03:46:15 +00:00
ASSERT_IS_LOCKED ( s_request_queue_lock ) ;
s_request_queue . push ( req ) ;
}
static ThreadedRequest_t * dequeue_request ( void ) {
ThreadedRequest_t * result = NULL ;
scoped_lock lock ( s_request_queue_lock ) ;
if ( ! s_request_queue . empty ( ) ) {
result = s_request_queue . front ( ) ;
s_request_queue . pop ( ) ;
}
return result ;
2011-12-27 05:21:12 +00:00
}
/* The function that does thread work. */
static void * iothread_worker ( void * threadPtr ) {
assert ( threadPtr ! = NULL ) ;
struct WorkerThread_t * thread = ( struct WorkerThread_t * ) threadPtr ;
2012-01-23 05:40:08 +00:00
// We don't want to receive signals on this thread
sigset_t set ;
sigfillset ( & set ) ;
VOMIT_ON_FAILURE ( pthread_sigmask ( SIG_SETMASK , & set , NULL ) ) ;
2011-12-27 05:21:12 +00:00
/* Grab a request off of the queue */
2012-02-28 03:46:15 +00:00
struct ThreadedRequest_t * req = dequeue_request ( ) ;
2011-12-27 05:21:12 +00:00
/* Run the handler and store the result */
if ( req ) {
req - > handlerResult = req - > handler ( req - > context ) ;
}
/* Write our index to wake up the main thread */
2012-03-01 01:55:50 +00:00
VOMIT_ON_FAILURE ( ! write_loop ( s_write_pipe , ( const char * ) & thread - > idx , sizeof thread - > idx ) ) ;
2011-12-27 05:21:12 +00:00
/* We're done */
return req ;
}
/* Spawn another thread if there's work to be done. */
static void iothread_spawn_if_needed ( void ) {
2012-02-28 03:46:15 +00:00
ASSERT_IS_LOCKED ( s_request_queue_lock ) ;
if ( ! s_request_queue . empty ( ) & & s_active_thread_count < IO_MAX_THREADS ) {
2011-12-27 05:21:12 +00:00
struct WorkerThread_t * thread = next_vacant_thread_slot ( ) ;
assert ( thread ! = NULL ) ;
/* Spawn a thread */
int err ;
do {
err = 0 ;
if ( pthread_create ( & thread - > thread , NULL , iothread_worker , thread ) ) {
err = errno ;
}
} while ( err = = EAGAIN ) ;
assert ( err = = 0 ) ;
/* Note that we are spawned another thread */
s_active_thread_count + = 1 ;
}
}
2012-02-15 19:33:41 +00:00
int iothread_perform_base ( int ( * handler ) ( void * ) , void ( * completionCallback ) ( void * , int ) , void * context ) {
2012-02-28 02:43:24 +00:00
ASSERT_IS_MAIN_THREAD ( ) ;
ASSERT_IS_NOT_FORKED_CHILD ( ) ;
2011-12-27 05:21:12 +00:00
iothread_init ( ) ;
/* Create and initialize a request. */
2012-02-17 22:54:58 +00:00
struct ThreadedRequest_t * req = new ThreadedRequest_t ( ) ;
2011-12-27 05:21:12 +00:00
req - > handler = handler ;
req - > completionCallback = completionCallback ;
req - > context = context ;
req - > sequenceNumber = + + s_last_sequence_number ;
2012-02-28 03:46:15 +00:00
/* Take our lock */
scoped_lock lock ( s_request_queue_lock ) ;
/* Add to the queue */
add_to_queue ( req ) ;
2011-12-27 05:21:12 +00:00
2012-02-28 03:46:15 +00:00
/* Spawn a thread if necessary */
iothread_spawn_if_needed ( ) ;
2011-12-27 05:21:12 +00:00
return 0 ;
}
int iothread_port ( void ) {
iothread_init ( ) ;
return s_read_pipe ;
}
void iothread_service_completion ( void ) {
2012-02-28 03:46:15 +00:00
ASSERT_IS_MAIN_THREAD ( ) ;
2011-12-27 05:21:12 +00:00
ThreadIndex_t threadIdx = ( ThreadIndex_t ) - 1 ;
2012-03-01 01:55:50 +00:00
VOMIT_ON_FAILURE ( 1 ! = read_loop ( iothread_port ( ) , & threadIdx , sizeof threadIdx ) ) ;
2011-12-27 05:21:12 +00:00
assert ( threadIdx < IO_MAX_THREADS ) ;
struct WorkerThread_t * thread = & threads [ threadIdx ] ;
assert ( thread - > thread ! = 0 ) ;
struct ThreadedRequest_t * req = NULL ;
VOMIT_ON_FAILURE ( pthread_join ( thread - > thread , ( void * * ) & req ) ) ;
/* Free up this thread */
thread - > thread = 0 ;
assert ( s_active_thread_count > 0 ) ;
s_active_thread_count - = 1 ;
/* Handle the request */
2012-02-17 22:54:58 +00:00
if ( req ) {
if ( req - > completionCallback )
req - > completionCallback ( req - > context , req - > handlerResult ) ;
delete req ;
}
2011-12-27 05:21:12 +00:00
/* Maybe spawn another thread, if there's more work to be done. */
VOMIT_ON_FAILURE ( pthread_mutex_lock ( & s_request_queue_lock ) ) ;
iothread_spawn_if_needed ( ) ;
VOMIT_ON_FAILURE ( pthread_mutex_unlock ( & s_request_queue_lock ) ) ;
}
2012-02-28 03:46:15 +00:00
void iothread_drain_all ( void ) {
ASSERT_IS_MAIN_THREAD ( ) ;
2012-03-01 01:55:50 +00:00
ASSERT_IS_NOT_FORKED_CHILD ( ) ;
2012-02-28 03:46:15 +00:00
while ( s_active_thread_count > 0 ) {
iothread_service_completion ( ) ;
}
}