Use fd_event_signaller in iothread completions

This simplifies how iothread notices when there are completions ready to
run.
This commit is contained in:
ridiculousfish 2021-02-06 18:43:43 -08:00
parent 8066428feb
commit e004930947
5 changed files with 64 additions and 88 deletions

View file

@ -971,7 +971,7 @@ static void test_debounce() {
// Wait until the last completion is done.
while (!completion_ran.back()) {
iothread_service_completion();
iothread_service_main();
}
iothread_drain_all();

View file

@ -124,7 +124,7 @@ char_event_t input_event_queue_t::readb() {
// Check for iothread completions only if there is no data to be read from the stdin.
// This gives priority to the foreground.
if (ioport > 0 && FD_ISSET(ioport, &fdset)) {
iothread_service_completion();
iothread_service_main();
if (auto mc = pop_discard_timeouts()) {
return *mc;
}

View file

@ -31,16 +31,9 @@
// which is too low, even tho the system can handle more than 64 threads.
#define IO_MAX_THREADS 1024
// Values for the wakeup bytes sent to the ioport.
#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99
#define IO_SERVICE_RESULT_QUEUE 100
// The amount of time an IO thread many hang around to service requests, in milliseconds.
#define IO_WAIT_FOR_WORK_DURATION_MS 500
static void iothread_service_main_thread_requests();
static void iothread_service_result_queue();
using void_function_t = std::function<void()>;
struct work_request_t {
@ -140,32 +133,38 @@ struct thread_pool_t {
/// Leaked to avoid shutdown dtor registration (including tsan).
static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS));
static owning_lock<std::vector<void_function_t>> s_result_queue;
/// A queue of "things to do on the main thread."
struct main_thread_queue_t {
// Functions to invoke as the completion callback from iothread_perform.
std::vector<void_function_t> completions;
// "Do on main thread" support.
// The queue of main thread requests. This queue contains pointers to structs that are
// stack-allocated on the requesting thread.
static owning_lock<std::vector<main_thread_request_t *>> s_main_thread_request_queue;
// iothread_perform_on_main requests.
// Note this contains pointers to structs that are stack-allocated on the requesting thread.
std::vector<main_thread_request_t *> requests;
// Pipes used for notifying.
struct notify_pipes_t {
int read;
int write;
};
/// \return the (immortal) set of pipes used for notifying of completions.
static const notify_pipes_t &get_notify_pipes() {
static const notify_pipes_t s_notify_pipes = [] {
auto pipes = make_autoclose_pipes();
if (!pipes) {
DIE_WITH_ERRNO("Unable to create iothread notify pipes");
/// Transfer ownership of ourselves to a new queue and return it.
/// 'this' is left empty.
main_thread_queue_t take() {
main_thread_queue_t result;
std::swap(result.completions, this->completions);
std::swap(result.requests, this->requests);
return result;
}
// Mark both ends as non-blocking.
if (make_fd_nonblocking(pipes->read.fd())) wperror(L"fcntl");
if (make_fd_nonblocking(pipes->write.fd())) wperror(L"fcntl");
return notify_pipes_t{pipes->read.acquire(), pipes->write.acquire()};
}();
return s_notify_pipes;
// Moving is allowed, but not copying.
main_thread_queue_t() = default;
main_thread_queue_t(main_thread_queue_t &&) = default;
main_thread_queue_t &operator=(main_thread_queue_t &&) = default;
main_thread_queue_t(const main_thread_queue_t &) = delete;
void operator=(const main_thread_queue_t &) = delete;
};
static owning_lock<main_thread_queue_t> s_main_thread_queue;
/// \return the signaller for completions and main thread requests.
static fd_event_signaller_t &get_notify_signaller() {
// Leaked to avoid shutdown dtors.
static fd_event_signaller_t *s_signaller = new fd_event_signaller_t();
return *s_signaller;
}
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
@ -198,10 +197,8 @@ maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
}
static void enqueue_thread_result(void_function_t req) {
s_result_queue.acquire()->push_back(std::move(req));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
s_main_thread_queue.acquire()->completions.push_back(std::move(req));
get_notify_signaller().post();
}
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
@ -285,23 +282,9 @@ void iothread_perform_impl(void_function_t &&func, void_function_t &&completion,
s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait);
}
int iothread_port() { return get_notify_pipes().read; }
int iothread_port() { return get_notify_signaller().read_fd(); }
void iothread_service_completion() {
ASSERT_IS_MAIN_THREAD();
// Drain the read buffer, and then service completions.
// The order is important.
int port = iothread_port();
char buff[256];
while (read(port, buff, sizeof buff) > 0) {
// pass
}
iothread_service_main_thread_requests();
iothread_service_result_queue();
}
static bool iothread_wait_for_pending_completions(long timeout_usec) {
static bool iothread_wait_for_main_requests(long timeout_usec) {
const long usec_per_sec = 1000000;
struct timeval tv;
tv.tv_sec = timeout_usec / usec_per_sec;
@ -314,9 +297,9 @@ static bool iothread_wait_for_pending_completions(long timeout_usec) {
return ret > 0;
}
void iothread_service_completion_with_timeout(long timeout_usec) {
if (iothread_wait_for_pending_completions(timeout_usec)) {
iothread_service_completion();
void iothread_service_main_with_timeout(long timeout_usec) {
if (iothread_wait_for_main_requests(timeout_usec)) {
iothread_service_main();
}
}
@ -344,9 +327,7 @@ int iothread_drain_all() {
// Nasty polling via select().
while (pool.req_data.acquire()->total_threads > 0) {
if (iothread_wait_for_pending_completions(1000)) {
iothread_service_completion();
}
iothread_service_main_with_timeout(1000);
}
// Clear the drain flag.
@ -364,33 +345,29 @@ int iothread_drain_all() {
return thread_count;
}
/// "Do on main thread" support.
static void iothread_service_main_thread_requests() {
// Service the main thread queue, by invoking any functions enqueued for the main thread.
void iothread_service_main() {
ASSERT_IS_MAIN_THREAD();
// Note the order here is important: we must consume events before handling requests, as posting
// uses the opposite order.
(void)get_notify_signaller().try_consume();
// Move the queue to a local variable.
std::vector<main_thread_request_t *> request_queue;
s_main_thread_request_queue.acquire()->swap(request_queue);
// Note the s_main_thread_queue lock is not held after this.
main_thread_queue_t queue = s_main_thread_queue.acquire()->take();
// Perform each of the functions. Note we are NOT responsible for deleting these. They are
// stack allocated in their respective threads!
for (main_thread_request_t *req : request_queue) {
req->func();
req->done.set_value();
}
}
// Service the queue of results
static void iothread_service_result_queue() {
// Move the queue to a local variable.
std::vector<void_function_t> result_queue;
s_result_queue.acquire()->swap(result_queue);
// Perform each completion in order
for (const auto &func : result_queue) {
// Perform each completion in order.
for (const void_function_t &func : queue.completions) {
// ensure we don't invoke empty functions, that raises an exception
if (func) func();
}
// Perform each main thread request. Note we are NOT responsible for deleting these. They are
// stack allocated in their respective threads!
for (main_thread_request_t *req : queue.requests) {
req->func();
req->done.set_value();
}
}
void iothread_perform_on_main(void_function_t &&func) {
@ -403,12 +380,10 @@ void iothread_perform_on_main(void_function_t &&func) {
main_thread_request_t req(std::move(func));
// Append it. Ensure we don't hold the lock after.
s_main_thread_request_queue.acquire()->push_back(&req);
s_main_thread_queue.acquire()->requests.push_back(&req);
// Tell the pipe and then wait until our future is set.
const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
// Tell the signaller and then wait until our future is set.
get_notify_signaller().post();
req.done.get_future().wait();
}

View file

@ -14,11 +14,12 @@
/// \return the fd on which to listen for completion callbacks.
int iothread_port();
/// Services iothread completion callbacks.
void iothread_service_completion();
/// Services iothread main thread completions and requests.
/// This does not block.
void iothread_service_main();
/// Services completions, except does not wait more than \p timeout_usec.
void iothread_service_completion_with_timeout(long timeout_usec);
// Services any main thread requests. Does not wait more than \p timeout_usec.
void iothread_service_main_with_timeout(long timeout_usec);
/// Waits for all iothreads to terminate.
/// \return the number of threads that were running.

View file

@ -2480,9 +2480,9 @@ void reader_data_t::finish_highlighting_before_exec() {
auto deadline = now + sc::milliseconds(kHighlightTimeoutForExecutionMs);
while (now < deadline) {
long timeout_usec = sc::duration_cast<sc::microseconds>(deadline - now).count();
iothread_service_completion_with_timeout(timeout_usec);
iothread_service_main_with_timeout(timeout_usec);
// Note iothread_service_completion_with_timeout will reentrantly modify us,
// Note iothread_service_main_with_timeout will reentrantly modify us,
// by invoking a completion.
if (in_flight_highlight_request.empty()) break;
now = sc::steady_clock::now();