mirror of
https://github.com/fish-shell/fish-shell
synced 2024-12-27 05:13:10 +00:00
Clean up some of the memory management in iothread.cpp
Store requests directly on the queue, instead of via a heap allocation
This commit is contained in:
parent
d373f1fc1d
commit
216f7d912a
2 changed files with 60 additions and 54 deletions
110
src/iothread.cpp
110
src/iothread.cpp
|
@ -33,32 +33,48 @@ static void iothread_service_main_thread_requests(void);
|
||||||
static void iothread_service_result_queue();
|
static void iothread_service_result_queue();
|
||||||
|
|
||||||
struct spawn_request_t {
|
struct spawn_request_t {
|
||||||
int (*handler)(void *);
|
int (*handler)(void *) = NULL;
|
||||||
void (*completionCallback)(void *, int);
|
void (*completion)(void *, int) = NULL;
|
||||||
void *context;
|
void *context = NULL;
|
||||||
int handlerResult;
|
int handler_result = -1;
|
||||||
|
|
||||||
|
spawn_request_t() {}
|
||||||
|
|
||||||
|
// Move-only
|
||||||
|
spawn_request_t &operator=(const spawn_request_t &) = delete;
|
||||||
|
spawn_request_t &operator=(spawn_request_t &&) = default;
|
||||||
|
spawn_request_t(const spawn_request_t &) = delete;
|
||||||
|
spawn_request_t(spawn_request_t &&) = default;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct main_thread_request_t {
|
struct main_thread_request_t {
|
||||||
int (*handler)(void *);
|
int (*handler)(void *) = NULL;
|
||||||
void *context;
|
void *context = NULL;
|
||||||
volatile int handlerResult;
|
volatile int handler_result = -1;
|
||||||
volatile bool done;
|
volatile bool done = false;
|
||||||
|
|
||||||
|
main_thread_request_t() {}
|
||||||
|
|
||||||
|
// No moving OR copying
|
||||||
|
// main_thread_requests are always stack allocated, and we deal in pointers to them
|
||||||
|
void operator=(const spawn_request_t &) = delete;
|
||||||
|
main_thread_request_t(const spawn_request_t &) = delete;
|
||||||
|
main_thread_request_t(spawn_request_t &&) = delete;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue,
|
// Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue,
|
||||||
// at which point they can be deallocated. s_active_thread_count is also protected by the lock.
|
// at which point they can be deallocated. s_active_thread_count is also protected by the lock.
|
||||||
static pthread_mutex_t s_spawn_queue_lock;
|
static pthread_mutex_t s_spawn_queue_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static std::queue<spawn_request_t *> s_request_queue;
|
static std::queue<spawn_request_t> s_request_queue;
|
||||||
static int s_active_thread_count;
|
static int s_active_thread_count;
|
||||||
|
|
||||||
static pthread_mutex_t s_result_queue_lock;
|
static pthread_mutex_t s_result_queue_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static std::queue<spawn_request_t *> s_result_queue;
|
static std::queue<spawn_request_t> s_result_queue;
|
||||||
|
|
||||||
// "Do on main thread" support.
|
// "Do on main thread" support.
|
||||||
static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests
|
static pthread_mutex_t s_main_thread_performer_lock = PTHREAD_MUTEX_INITIALIZER; // protects the main thread requests
|
||||||
static pthread_cond_t s_main_thread_performer_cond; // protects the main thread requests
|
static pthread_cond_t s_main_thread_performer_cond; // protects the main thread requests
|
||||||
static pthread_mutex_t s_main_thread_request_q_lock; // protects the queue
|
static pthread_mutex_t s_main_thread_request_q_lock = PTHREAD_MUTEX_INITIALIZER; // protects the queue
|
||||||
static std::queue<main_thread_request_t *> s_main_thread_request_queue;
|
static std::queue<main_thread_request_t *> s_main_thread_request_queue;
|
||||||
|
|
||||||
// Notifying pipes.
|
// Notifying pipes.
|
||||||
|
@ -70,10 +86,6 @@ static void iothread_init(void) {
|
||||||
inited = true;
|
inited = true;
|
||||||
|
|
||||||
// Initialize some locks.
|
// Initialize some locks.
|
||||||
VOMIT_ON_FAILURE(pthread_mutex_init(&s_spawn_queue_lock, NULL));
|
|
||||||
VOMIT_ON_FAILURE(pthread_mutex_init(&s_result_queue_lock, NULL));
|
|
||||||
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_request_q_lock, NULL));
|
|
||||||
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_performer_lock, NULL));
|
|
||||||
VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_cond, NULL));
|
VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_cond, NULL));
|
||||||
|
|
||||||
// Initialize the completion pipes.
|
// Initialize the completion pipes.
|
||||||
|
@ -89,24 +101,24 @@ static void iothread_init(void) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void add_to_queue(struct spawn_request_t *req) {
|
static void add_to_queue(struct spawn_request_t req) {
|
||||||
ASSERT_IS_LOCKED(s_spawn_queue_lock);
|
ASSERT_IS_LOCKED(s_spawn_queue_lock);
|
||||||
s_request_queue.push(req);
|
s_request_queue.push(std::move(req));
|
||||||
}
|
}
|
||||||
|
|
||||||
static spawn_request_t *dequeue_spawn_request(void) {
|
static bool dequeue_spawn_request(spawn_request_t *result) {
|
||||||
ASSERT_IS_LOCKED(s_spawn_queue_lock);
|
ASSERT_IS_LOCKED(s_spawn_queue_lock);
|
||||||
spawn_request_t *result = NULL;
|
|
||||||
if (!s_request_queue.empty()) {
|
if (!s_request_queue.empty()) {
|
||||||
result = s_request_queue.front();
|
*result = std::move(s_request_queue.front());
|
||||||
s_request_queue.pop();
|
s_request_queue.pop();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
return result;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void enqueue_thread_result(spawn_request_t *req) {
|
static void enqueue_thread_result(spawn_request_t req) {
|
||||||
scoped_lock locker(s_result_queue_lock);
|
scoped_lock locker(s_result_queue_lock);
|
||||||
s_result_queue.push(req);
|
s_result_queue.push(std::move(req));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
|
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
|
||||||
|
@ -115,22 +127,19 @@ static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
|
||||||
static void *iothread_worker(void *unused) {
|
static void *iothread_worker(void *unused) {
|
||||||
UNUSED(unused);
|
UNUSED(unused);
|
||||||
scoped_lock locker(s_spawn_queue_lock);
|
scoped_lock locker(s_spawn_queue_lock);
|
||||||
struct spawn_request_t *req;
|
struct spawn_request_t req;
|
||||||
while ((req = dequeue_spawn_request()) != NULL) {
|
while (dequeue_spawn_request(&req)) {
|
||||||
debug(5, "pthread %p dequeued %p\n", this_thread(), req);
|
debug(5, "pthread %p dequeued\n", this_thread());
|
||||||
// Unlock the queue while we execute the request.
|
// Unlock the queue while we execute the request.
|
||||||
locker.unlock();
|
locker.unlock();
|
||||||
|
|
||||||
// Perform the work.
|
// Perform the work.
|
||||||
req->handlerResult = req->handler(req->context);
|
req.handler_result = req.handler(req.context);
|
||||||
|
|
||||||
// If there's a completion handler, we have to enqueue it on the result queue. Otherwise, we
|
// If there's a completion handler, we have to enqueue it on the result queue.
|
||||||
// can just delete the request!
|
if (req.completion != NULL) {
|
||||||
if (req->completionCallback == NULL) {
|
|
||||||
delete req;
|
|
||||||
} else {
|
|
||||||
// Enqueue the result, and tell the main thread about it.
|
// Enqueue the result, and tell the main thread about it.
|
||||||
enqueue_thread_result(req);
|
enqueue_thread_result(std::move(req));
|
||||||
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
|
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
|
||||||
VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
|
VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
|
||||||
}
|
}
|
||||||
|
@ -177,17 +186,17 @@ static void iothread_spawn() {
|
||||||
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL));
|
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int),
|
int iothread_perform_base(int (*handler)(void *), void (*completion)(void *, int),
|
||||||
void *context) {
|
void *context) {
|
||||||
ASSERT_IS_MAIN_THREAD();
|
ASSERT_IS_MAIN_THREAD();
|
||||||
ASSERT_IS_NOT_FORKED_CHILD();
|
ASSERT_IS_NOT_FORKED_CHILD();
|
||||||
iothread_init();
|
iothread_init();
|
||||||
|
|
||||||
// Create and initialize a request.
|
// Create and initialize a request.
|
||||||
struct spawn_request_t *req = new spawn_request_t();
|
struct spawn_request_t req;
|
||||||
req->handler = handler;
|
req.handler = handler;
|
||||||
req->completionCallback = completionCallback;
|
req.completion = completion;
|
||||||
req->context = context;
|
req.context = context;
|
||||||
|
|
||||||
int local_thread_count = -1;
|
int local_thread_count = -1;
|
||||||
bool spawn_new_thread = false;
|
bool spawn_new_thread = false;
|
||||||
|
@ -195,7 +204,7 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
|
||||||
// Lock around a local region. Note that we can only access s_active_thread_count under the
|
// Lock around a local region. Note that we can only access s_active_thread_count under the
|
||||||
// lock.
|
// lock.
|
||||||
scoped_lock locker(s_spawn_queue_lock);
|
scoped_lock locker(s_spawn_queue_lock);
|
||||||
add_to_queue(req);
|
add_to_queue(std::move(req));
|
||||||
if (s_active_thread_count < IO_MAX_THREADS) {
|
if (s_active_thread_count < IO_MAX_THREADS) {
|
||||||
s_active_thread_count++;
|
s_active_thread_count++;
|
||||||
spawn_new_thread = true;
|
spawn_new_thread = true;
|
||||||
|
@ -297,7 +306,7 @@ static void iothread_service_main_thread_requests(void) {
|
||||||
while (!request_queue.empty()) {
|
while (!request_queue.empty()) {
|
||||||
main_thread_request_t *req = request_queue.front();
|
main_thread_request_t *req = request_queue.front();
|
||||||
request_queue.pop();
|
request_queue.pop();
|
||||||
req->handlerResult = req->handler(req->context);
|
req->handler_result = req->handler(req->context);
|
||||||
req->done = true;
|
req->done = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,10 +324,10 @@ static void iothread_service_main_thread_requests(void) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Service the queue of results */
|
// Service the queue of results
|
||||||
static void iothread_service_result_queue() {
|
static void iothread_service_result_queue() {
|
||||||
// Move the queue to a local variable.
|
// Move the queue to a local variable.
|
||||||
std::queue<spawn_request_t *> result_queue;
|
std::queue<spawn_request_t> result_queue;
|
||||||
{
|
{
|
||||||
scoped_lock queue_lock(s_result_queue_lock);
|
scoped_lock queue_lock(s_result_queue_lock);
|
||||||
std::swap(result_queue, s_result_queue);
|
std::swap(result_queue, s_result_queue);
|
||||||
|
@ -326,12 +335,11 @@ static void iothread_service_result_queue() {
|
||||||
|
|
||||||
// Perform each completion in order. We are responsibile for cleaning them up.
|
// Perform each completion in order. We are responsibile for cleaning them up.
|
||||||
while (!result_queue.empty()) {
|
while (!result_queue.empty()) {
|
||||||
spawn_request_t *req = result_queue.front();
|
spawn_request_t req = std::move(result_queue.front());
|
||||||
result_queue.pop();
|
result_queue.pop();
|
||||||
if (req->completionCallback) {
|
if (req.completion) {
|
||||||
req->completionCallback(req->context, req->handlerResult);
|
req.completion(req.context, req.handler_result);
|
||||||
}
|
}
|
||||||
delete req;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,8 +353,6 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context) {
|
||||||
main_thread_request_t req;
|
main_thread_request_t req;
|
||||||
req.handler = handler;
|
req.handler = handler;
|
||||||
req.context = context;
|
req.context = context;
|
||||||
req.handlerResult = 0;
|
|
||||||
req.done = false;
|
|
||||||
|
|
||||||
// Append it. Do not delete the nested scope as it is crucial to the proper functioning of this
|
// Append it. Do not delete the nested scope as it is crucial to the proper functioning of this
|
||||||
// code by virtue of the lock management.
|
// code by virtue of the lock management.
|
||||||
|
@ -370,5 +376,5 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context) {
|
||||||
|
|
||||||
// Ok, the request must now be done.
|
// Ok, the request must now be done.
|
||||||
assert(req.done);
|
assert(req.done);
|
||||||
return req.handlerResult;
|
return req.handler_result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,9 +29,9 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context);
|
||||||
|
|
||||||
/// Helper templates.
|
/// Helper templates.
|
||||||
template <typename T>
|
template <typename T>
|
||||||
int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int), T *context) {
|
int iothread_perform(int (*handler)(T *), void (*completion)(T *, int), T *context) {
|
||||||
return iothread_perform_base((int (*)(void *))handler,
|
return iothread_perform_base((int (*)(void *))handler,
|
||||||
(void (*)(void *, int))completionCallback,
|
(void (*)(void *, int))completion,
|
||||||
static_cast<void *>(context));
|
static_cast<void *>(context));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue