Enable use of std::function and lambdas in iothread_perform

This commit is contained in:
ridiculousfish 2017-01-23 11:35:22 -08:00
parent 02ddc20c87
commit 1cfbd62266
2 changed files with 45 additions and 35 deletions

View file

@ -33,13 +33,19 @@
static void iothread_service_main_thread_requests(void);
static void iothread_service_result_queue();
struct spawn_request_t {
int (*handler)(void *) = NULL;
void (*completion)(void *, int) = NULL;
void *context = NULL;
int handler_result = -1;
typedef std::function<void(void)> void_function_t;
spawn_request_t() {}
struct spawn_request_t {
void_function_t handler;
void_function_t completion;
spawn_request_t()
{}
spawn_request_t(void_function_t f, void_function_t comp) :
handler(std::move(f)),
completion(std::move(comp))
{}
// Move-only
spawn_request_t &operator=(const spawn_request_t &) = delete;
@ -50,9 +56,9 @@ struct spawn_request_t {
struct main_thread_request_t {
volatile bool done = false;
std::function<void(void)> func;
void_function_t func;
main_thread_request_t(std::function<void(void)> &&f) : func(f) {}
main_thread_request_t(void_function_t f) : func(std::move(f)) {}
// No moving OR copying
// main_thread_requests are always stack allocated, and we deal in pointers to them
@ -131,10 +137,11 @@ static void *iothread_worker(void *unused) {
locker.unlock();
// Perform the work.
req.handler_result = req.handler(req.context);
req.handler();
// If there's a completion handler, we have to enqueue it on the result queue.
if (req.completion != NULL) {
// Note we're using std::function's weirdo operator== here
if (req.completion != nullptr) {
// Enqueue the result, and tell the main thread about it.
enqueue_thread_result(std::move(req));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
@ -183,23 +190,17 @@ static void iothread_spawn() {
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL));
}
int iothread_perform_base(int (*handler)(void *), void (*completion)(void *, int),
void *context) {
int iothread_perform(void_function_t &&func, void_function_t &&completion) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
iothread_init();
// Create and initialize a request.
struct spawn_request_t req;
req.handler = handler;
req.completion = completion;
req.context = context;
struct spawn_request_t req(std::move(func), std::move(completion));
int local_thread_count = -1;
bool spawn_new_thread = false;
{
// Lock around a local region. Note that we can only access s_active_thread_count under the
// lock.
// Lock around a local region.
// Note that we can only access s_active_thread_count under the lock.
scoped_lock locker(s_spawn_queue_lock);
add_to_queue(std::move(req));
if (s_active_thread_count < IO_MAX_THREADS) {
@ -213,8 +214,6 @@ int iothread_perform_base(int (*handler)(void *), void (*completion)(void *, int
if (spawn_new_thread) {
iothread_spawn();
}
// We return the active thread count for informational purposes only.
return local_thread_count;
}
@ -334,13 +333,14 @@ static void iothread_service_result_queue() {
while (!result_queue.empty()) {
spawn_request_t req = std::move(result_queue.front());
result_queue.pop();
if (req.completion) {
req.completion(req.context, req.handler_result);
// ensure we don't invoke empty functions, that raises an exception
if (req.completion != nullptr) {
req.completion();
}
}
}
void iothread_perform_on_main(std::function<void(void)> &&func) {
void iothread_perform_on_main(void_function_t &&func) {
if (is_main_thread()) {
func();
return;

View file

@ -26,19 +26,29 @@ void iothread_service_completion(void);
/// Waits for all iothreads to terminate.
void iothread_drain_all(void);
/// Helper templates.
template <typename T>
int iothread_perform(int (*handler)(T *), void (*completion)(T *, int), T *context) {
return iothread_perform_base((int (*)(void *))handler,
(void (*)(void *, int))completion,
static_cast<void *>(context));
int iothread_perform(std::function<void(void)> &&func,
std::function<void(void)> &&completion = std::function<void(void)>());
// Variant that allows computing a value in func, and then passing it to the completion handler
template<typename T>
int iothread_perform(std::function<T(void)> &&handler, std::function<void(T)> &&completion) {
T *result = new T();
return iothread_perform([=](){ *result = handler(); },
[=](){ completion(std::move(*result)); delete result; }
);
}
/// Legacy templates
template <typename T>
int iothread_perform(int (*handler)(T *), void (*completion)(T *, int), T *context) {
return iothread_perform(std::function<int(void)>([=](){return handler(context);}),
std::function<void(int)>([=](int v){completion(context, v);})
);
}
// Variant that takes no completion callback.
template <typename T>
int iothread_perform(int (*handler)(T *), T *context) {
return iothread_perform_base((int (*)(void *))handler, (void (*)(void *, int))0,
static_cast<void *>(context));
return iothread_perform([=](){ handler(context); });
}
/// Performs a function on the main thread, blocking until it completes.