Simplify threading implementation. Removed iothread array. Threads now

run detached (no more pthread_join), and will not exit until they see
that all requests have been dequeued.
This commit is contained in:
ridiculousfish 2014-04-17 12:02:43 -07:00
parent 1ce30deec3
commit 3d1a204c83
2 changed files with 168 additions and 124 deletions

View file

@ -494,14 +494,18 @@ static void test_iothread(void)
{
say(L"Testing iothreads");
int *int_ptr = new int(0);
int iterations = 1000;
int iterations = 5000000;
int max_achieved_thread_count = 0;
double start = timef();
for (int i=0; i < iterations; i++)
{
iothread_perform(test_iothread_thread_call, (void (*)(int *, int))NULL, int_ptr);
int thread_count = iothread_perform(test_iothread_thread_call, (void (*)(int *, int))NULL, int_ptr);
max_achieved_thread_count = std::max(max_achieved_thread_count, thread_count);
}
// Now wait until we're done
iothread_drain_all();
double end = timef();
// Should have incremented it once per thread
if (*int_ptr != iterations)
@ -509,6 +513,8 @@ static void test_iothread(void)
say(L"Expected int to be %d, but instead it was %d", iterations, *int_ptr);
}
say(L" (%.02f msec, with max of %d threads)", (end - start) * 1000.0, max_achieved_thread_count);
delete int_ptr;
}

View file

@ -22,20 +22,14 @@
#define IO_MAX_THREADS 64
#endif
/* A special "thread index" that means service main thread requests */
/* Values for the wakeup bytes sent to the ioport */
#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99
#define IO_SERVICE_RESULT_QUEUE 100
#define IOTHREAD_LOG if (0)
static void iothread_service_main_thread_requests(void);
static int s_active_thread_count;
typedef unsigned char ThreadIndex_t;
static struct WorkerThread_t
{
ThreadIndex_t idx;
pthread_t thread;
} threads[IO_MAX_THREADS];
static void iothread_service_result_queue();
struct SpawnRequest_t
{
@ -53,18 +47,13 @@ struct MainThreadRequest_t
volatile bool done;
};
static struct WorkerThread_t *next_vacant_thread_slot(void)
{
for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++)
{
if (! threads[i].thread) return &threads[i];
}
return NULL;
}
/* Spawn support */
/* Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue, at which point they can be deallocated. s_active_thread_count is also protected by the lock. */
static pthread_mutex_t s_spawn_queue_lock;
static std::queue<SpawnRequest_t *> s_request_queue;
static volatile int s_active_thread_count;
static pthread_mutex_t s_result_queue_lock;
static std::queue<SpawnRequest_t *> s_result_queue;
/* "Do on main thread" support */
static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests
@ -84,6 +73,7 @@ static void iothread_init(void)
/* Initialize some locks */
VOMIT_ON_FAILURE(pthread_mutex_init(&s_spawn_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_result_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_request_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_performer_lock, NULL));
VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_condition, NULL));
@ -97,12 +87,6 @@ static void iothread_init(void)
// 0 means success to VOMIT_ON_FAILURE. Arrange to pass 0 if fcntl returns anything other than -1.
VOMIT_ON_FAILURE(-1 == fcntl(s_read_pipe, F_SETFD, FD_CLOEXEC));
VOMIT_ON_FAILURE(-1 == fcntl(s_write_pipe, F_SETFD, FD_CLOEXEC));
/* Tell each thread its index */
for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++)
{
threads[i].idx = i;
}
}
}
@ -114,8 +98,8 @@ static void add_to_queue(struct SpawnRequest_t *req)
static SpawnRequest_t *dequeue_spawn_request(void)
{
ASSERT_IS_LOCKED(s_spawn_queue_lock);
SpawnRequest_t *result = NULL;
scoped_lock lock(s_spawn_queue_lock);
if (! s_request_queue.empty())
{
result = s_request_queue.front();
@ -124,63 +108,74 @@ static SpawnRequest_t *dequeue_spawn_request(void)
return result;
}
/* The function that does thread work. */
static void *iothread_worker(void *threadPtr)
static void enqueue_thread_result(SpawnRequest_t *req)
{
assert(threadPtr != NULL);
struct WorkerThread_t *thread = (struct WorkerThread_t *)threadPtr;
/* Grab a request off of the queue */
struct SpawnRequest_t *req = dequeue_spawn_request();
/* Run the handler and store the result */
if (req)
{
req->handlerResult = req->handler(req->context);
}
/* Write our index to wake up the main thread */
VOMIT_ON_FAILURE(! write_loop(s_write_pipe, (const char *)&thread->idx, sizeof thread->idx));
/* We're done */
return req;
scoped_lock lock(s_result_queue_lock);
s_result_queue.push(req);
}
/* Spawn another thread if there's work to be done. */
static void iothread_spawn_if_needed(void)
/* The function that does thread work. */
static void *iothread_worker(void *unused)
{
ASSERT_IS_LOCKED(s_spawn_queue_lock);
if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS)
scoped_lock locker(s_spawn_queue_lock);
struct SpawnRequest_t *req;
while ((req = dequeue_spawn_request()) != NULL)
{
struct WorkerThread_t *thread = next_vacant_thread_slot();
assert(thread != NULL);
IOTHREAD_LOG fprintf(stderr, "pthread %p dequeued %p\n", pthread_self(), req);
/* Unlock the queue while we execute the request */
locker.unlock();
/* The spawned thread inherits our signal mask. We don't want the thread to ever receive signals on the spawned thread, so temporarily block all signals, spawn the thread, and then restore it. */
sigset_t newSet, savedSet;
sigfillset(&newSet);
VOMIT_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &newSet, &savedSet));
/* Perfor the work */
req->handlerResult = req->handler(req->context);
/* Spawn a thread. */
int err;
do
/* If there's a completion handler, we have to enqueue it on the result queue. Otherwise, we can just delete the request! */
if (req->completionCallback == NULL)
{
err = 0;
if (pthread_create(&thread->thread, NULL, iothread_worker, thread))
{
err = errno;
}
delete req;
}
else
{
/* Enqueue the result, and tell the main thread about it */
enqueue_thread_result(req);
fprintf(stderr, "write to %d\n", s_write_pipe);
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
VOMIT_ON_FAILURE(! write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
}
while (err == EAGAIN);
/* Need better error handling - perhaps try again later. */
assert(err == 0);
/* Note that we are spawned another thread */
s_active_thread_count += 1;
/* Restore our sigmask */
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &savedSet, NULL));
/* Lock us up again */
locker.lock();
}
/* We believe we have exhausted the thread request queue. We want to decrement s_active_thread_count and exit. But it's possible that a request just came in. Furthermore, it's possible that the main thread saw that s_active_thread_count is full, and decided to not spawn a new thread, trusting in one of the existing threads to handle it. But we've already committed to not handling anything else. Therefore, we have to decrement s_active_thread_count under the lock, which we still hold. Likewise, the main thread must check the value under the lock. */
ASSERT_IS_LOCKED(s_spawn_queue_lock);
assert(s_active_thread_count > 0);
s_active_thread_count -= 1;
IOTHREAD_LOG fprintf(stderr, "pthread %p exiting\n", pthread_self());
/* We're done */
return NULL;
}
/* Spawn another thread. No lock is held when this is called. */
static void iothread_spawn()
{
/* The spawned thread inherits our signal mask. We don't want the thread to ever receive signals on the spawned thread, so temporarily block all signals, spawn the thread, and then restore it. */
sigset_t new_set, saved_set;
sigfillset(&new_set);
VOMIT_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set));
/* Spawn a thread. If this fails, it means there's already a bunch of threads; it is very unlikely that they are all on the verge of exiting, so one is likely to be ready to handle extant requests. So we can ignore failure with some confidence. */
pthread_t thread = NULL;
pthread_create(&thread, NULL, iothread_worker, NULL);
/* We will never join this thread */
VOMIT_ON_FAILURE(pthread_detach(thread));
IOTHREAD_LOG fprintf(stderr, "pthread %p spawned\n", thread);
/* Restore our sigmask */
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL));
}
int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int), void *context)
@ -195,15 +190,28 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
req->completionCallback = completionCallback;
req->context = context;
/* Take our lock */
scoped_lock lock(s_spawn_queue_lock);
int local_thread_count = -1;
bool spawn_new_thread = false;
{
/* Lock around a local region. Note that we can only access s_active_thread_count under the lock. */
scoped_lock lock(s_spawn_queue_lock);
add_to_queue(req);
if (s_active_thread_count < IO_MAX_THREADS)
{
s_active_thread_count++;
spawn_new_thread = true;
}
local_thread_count = s_active_thread_count;
}
/* Add to the queue */
add_to_queue(req);
/* Kick off the thread if we decided to do so */
if (spawn_new_thread)
{
iothread_spawn();
}
/* Spawn a thread if necessary */
iothread_spawn_if_needed();
return 0;
/* We return the active thread count for informational purposes only */
return local_thread_count;
}
int iothread_port(void)
@ -215,56 +223,63 @@ int iothread_port(void)
void iothread_service_completion(void)
{
ASSERT_IS_MAIN_THREAD();
ThreadIndex_t threadIdx = (ThreadIndex_t)-1;
VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &threadIdx, sizeof threadIdx));
if (threadIdx == IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE)
char wakeup_byte = 0;
VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &wakeup_byte, sizeof wakeup_byte));
switch (wakeup_byte)
{
iothread_service_main_thread_requests();
}
else
{
assert(threadIdx < IO_MAX_THREADS);
struct WorkerThread_t *thread = &threads[threadIdx];
assert(thread->thread != 0);
struct SpawnRequest_t *req = NULL;
VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req));
/* Free up this thread */
thread->thread = 0;
assert(s_active_thread_count > 0);
s_active_thread_count -= 1;
/* Handle the request */
if (req)
{
if (req->completionCallback)
req->completionCallback(req->context, req->handlerResult);
delete req;
}
/* Maybe spawn another thread, if there's more work to be done. */
scoped_lock locker(s_spawn_queue_lock);
iothread_spawn_if_needed();
case IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE:
iothread_service_main_thread_requests();
break;
case IO_SERVICE_RESULT_QUEUE:
iothread_service_result_queue();
break;
default:
fprintf(stderr, "Unknown wakeup byte %02x in %s\n", wakeup_byte, __FUNCTION__);
break;
}
}
static bool iothread_wait_for_pending_completions(long timeout_usec)
{
const long usec_per_sec = 1000000;
struct timeval tv;
tv.tv_sec = timeout_usec / usec_per_sec;
tv.tv_usec = timeout_usec % usec_per_sec;
const int fd = iothread_port();
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
int ret = select(fd + 1, &fds, NULL, NULL, &tv);
return ret > 0;
}
/* Note that this function is quite sketchy. In particular, it drains threads, not requests, meaning that it may leave requests on the queue. This is the desired behavior (it may be called before fork, and we don't want to bother servicing requests before we fork), but in the test suite we depend on it draining all requests. In practice, this works, because a thread in practice won't exit while there is outstanding requests.
At the moment, this function is only used in the test suite and in a drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that it's terrible.
*/
void iothread_drain_all(void)
{
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
/* Hackish. Since we are the only thread that can increment s_active_thread_count, we can check for a zero value without locking; the true value may be smaller than we read, but never bigger. */
if (s_active_thread_count == 0)
return;
#define TIME_DRAIN 0
#if TIME_DRAIN
int thread_count = s_active_thread_count;
double now = timef();
#endif
/* Nasty polling via select(). */
while (s_active_thread_count > 0)
{
iothread_service_completion();
if (iothread_wait_for_pending_completions(1000))
{
iothread_service_completion();
}
}
#if TIME_DRAIN
double after = timef();
@ -273,7 +288,6 @@ void iothread_drain_all(void)
}
/* "Do on main thread" support */
static void iothread_service_main_thread_requests(void)
{
ASSERT_IS_MAIN_THREAD();
@ -303,6 +317,30 @@ static void iothread_service_main_thread_requests(void)
}
}
/* Service the queue of results */
static void iothread_service_result_queue()
{
// Move the queue to a local variable
std::queue<SpawnRequest_t *> result_queue;
{
scoped_lock queue_lock(s_main_thread_request_queue_lock);
std::swap(result_queue, s_result_queue);
}
// Perform each completion in order
// We are responsibile for cleaning them up
while (! result_queue.empty())
{
SpawnRequest_t *req = result_queue.front();
result_queue.pop();
if (req->completionCallback)
{
req->completionCallback(req->context, req->handlerResult);
}
delete req;
}
}
int iothread_perform_on_main_base(int (*handler)(void *), void *context)
{
// If this is the main thread, just do it
@ -325,8 +363,8 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context)
}
// Tell the pipe
const ThreadIndex_t idx = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
VOMIT_ON_FAILURE(! write_loop(s_write_pipe, (const char *)&idx, sizeof idx));
const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
VOMIT_ON_FAILURE(! write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
// Wait on the condition, until we're done
scoped_lock perform_lock(s_main_thread_performer_lock);