Integrate threads.rs w/ legacy C++ code

Largely routine but for the trampolines in iothread.h and iothread.cpp which
were a real PITA to get correct w/ all their variants.

Integration is complete with all old code ripped out and the tests using the
rust version of the code.
This commit is contained in:
Mahmoud Al-Qudsi 2023-04-25 21:38:53 -05:00
parent 7f9a942f1d
commit 6cd2d0ffed
18 changed files with 292 additions and 580 deletions

View file

@ -47,6 +47,7 @@ fn main() -> miette::Result<()> {
"src/timer.rs", "src/timer.rs",
"src/tokenizer.rs", "src/tokenizer.rs",
"src/topic_monitor.rs", "src/topic_monitor.rs",
"src/threads.rs",
"src/trace.rs", "src/trace.rs",
"src/util.rs", "src/util.rs",
"src/wait_handle.rs", "src/wait_handle.rs",

View file

@ -49,6 +49,86 @@ static NOTIFY_SIGNALLER: once_cell::sync::Lazy<&'static crate::fd_monitor::FdEve
result result
}); });
#[cxx::bridge]
mod ffi {
extern "Rust" {
#[cxx_name = "ASSERT_IS_MAIN_THREAD"]
fn assert_is_main_thread();
#[cxx_name = "ASSERT_IS_BACKGROUND_THREAD"]
fn assert_is_background_thread();
#[cxx_name = "ASSERT_IS_NOT_FORKED_CHILD"]
fn assert_is_not_forked_child();
fn configure_thread_assertions_for_testing();
fn is_main_thread() -> bool;
fn is_forked_child() -> bool;
}
extern "Rust" {
#[cxx_name = "make_detached_pthread"]
fn spawn_ffi(callback: *const u8, param: *const u8) -> bool;
}
extern "Rust" {
fn iothread_port() -> i32;
fn iothread_service_main();
#[cxx_name = "iothread_service_main_with_timeout"]
fn iothread_service_main_with_timeout_ffi(timeout_usec: u64);
#[cxx_name = "iothread_drain_all"]
fn iothread_drain_all_ffi();
#[cxx_name = "iothread_perform"]
fn iothread_perform_ffi(callback: *const u8, param: *const u8);
#[cxx_name = "iothread_perform_cantwait"]
fn iothread_perform_cant_wait_ffi(callback: *const u8, param: *const u8);
}
extern "Rust" {
#[cxx_name = "debounce_t"]
type Debounce;
#[cxx_name = "perform"]
fn perform_ffi(&self, callback: *const u8, param: *const u8) -> u64;
#[cxx_name = "perform_with_completion"]
fn perform_with_completion_ffi(
&self,
callback: *const u8,
param1: *const u8,
completion: *const u8,
param2: *const u8,
) -> u64;
#[cxx_name = "new_debounce_t"]
fn new_debounce_ffi(timeout_ms: u64) -> Box<Debounce>;
}
}
fn iothread_service_main_with_timeout_ffi(timeout_usec: u64) {
iothread_service_main_with_timeout(Duration::from_micros(timeout_usec))
}
fn iothread_drain_all_ffi() {
unsafe { iothread_drain_all() }
}
fn iothread_perform_ffi(callback: *const u8, param: *const u8) {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
iothread_perform(move || {
callback(param);
});
}
fn iothread_perform_cant_wait_ffi(callback: *const u8, param: *const u8) {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
iothread_perform_cant_wait(move || {
callback(param);
});
}
/// A [`ThreadPool`] or [`Debounce`] work request. /// A [`ThreadPool`] or [`Debounce`] work request.
type WorkItem = Box<dyn FnOnce() + 'static + Send>; type WorkItem = Box<dyn FnOnce() + 'static + Send>;
@ -131,6 +211,18 @@ pub fn is_forked_child() -> bool {
IS_FORKED_PROC.load(Ordering::Relaxed) IS_FORKED_PROC.load(Ordering::Relaxed)
} }
#[inline(always)]
pub fn assert_is_not_forked_child() {
#[cold]
fn panic_is_forked_child() {
panic!("Function called from forked child!");
}
if is_forked_child() {
panic_is_forked_child();
}
}
/// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a /// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a
/// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe /// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe
/// an unsafe version that doesn't do any lifetime checking akin to /// an unsafe version that doesn't do any lifetime checking akin to
@ -194,6 +286,16 @@ pub fn spawn<F: FnOnce() + Send + 'static>(callback: F) -> bool {
result result
} }
fn spawn_ffi(callback: *const u8, param: *const u8) -> bool {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
spawn(move || {
callback(param);
})
}
/// Data shared between the thread pool [`ThreadPool`] and worker threads [`WorkerThread`]. /// Data shared between the thread pool [`ThreadPool`] and worker threads [`WorkerThread`].
#[derive(Default)] #[derive(Default)]
struct ThreadPoolProtected { struct ThreadPoolProtected {
@ -422,11 +524,12 @@ pub fn iothread_perform_cant_wait(f: impl FnOnce() + 'static + Send) {
thread_pool.perform(f, true); thread_pool.perform(f, true);
} }
pub fn iothread_port() -> i32 {
i32::from(NOTIFY_SIGNALLER.read_fd())
}
pub fn iothread_service_main_with_timeout(timeout: Duration) { pub fn iothread_service_main_with_timeout(timeout: Duration) {
if crate::fd_readable_set::is_fd_readable( if crate::fd_readable_set::is_fd_readable(iothread_port(), timeout.as_millis() as u64) {
i32::from(NOTIFY_SIGNALLER.read_fd()),
timeout.as_millis() as u64,
) {
iothread_service_main(); iothread_service_main();
} }
} }
@ -491,6 +594,10 @@ struct DebounceData {
start_time: Instant, start_time: Instant,
} }
fn new_debounce_ffi(timeout_ms: u64) -> Box<Debounce> {
Box::new(Debounce::new(Duration::from_millis(timeout_ms)))
}
impl Debounce { impl Debounce {
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Duration) -> Self {
Self { Self {
@ -538,6 +645,41 @@ impl Debounce {
self.perform_inner(h) self.perform_inner(h)
} }
fn perform_with_completion_ffi(
&self,
callback: *const u8,
param1: *const u8,
completion_callback: *const u8,
param2: *const u8,
) -> u64 {
type Callback = extern "C" fn(crate::ffi::void_ptr) -> crate::ffi::void_ptr;
type CompletionCallback = extern "C" fn(crate::ffi::void_ptr, crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param1 = param1.into();
let completion_callback: CompletionCallback =
unsafe { std::mem::transmute(completion_callback) };
let param2 = param2.into();
self.perform_with_completion(
move || callback(param1),
move |result| completion_callback(param2, result),
)
.into()
}
fn perform_ffi(&self, callback: *const u8, param: *const u8) -> u64 {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
self.perform(move || {
callback(param);
})
.into()
}
/// Enqueue `handler` to be performed on a background thread with [`Completion`] `completion` /// Enqueue `handler` to be performed on a background thread with [`Completion`] `completion`
/// to be performed on the main thread. If a function is already enqueued, this overwrites it /// to be performed on the main thread. If a function is already enqueued, this overwrites it
/// and that function will not be executed. /// and that function will not be executed.

View file

@ -1373,24 +1373,6 @@ extern "C" {
} }
} }
void set_main_thread() {
// Just call thread_id() once to force increment of thread_id.
uint64_t tid = thread_id();
assert(tid == 1 && "main thread should have thread ID 1");
(void)tid;
}
void configure_thread_assertions_for_testing() { thread_asserts_cfg_for_testing = true; }
bool is_forked_child() { return is_forked_proc; }
void setup_fork_guards() {
is_forked_proc = false;
static std::once_flag fork_guard_flag;
std::call_once(fork_guard_flag,
[] { pthread_atfork(nullptr, nullptr, [] { is_forked_proc = true; }); });
}
void save_term_foreground_process_group() { initial_fg_process_group = tcgetpgrp(STDIN_FILENO); } void save_term_foreground_process_group() { initial_fg_process_group = tcgetpgrp(STDIN_FILENO); }
void restore_term_foreground_process_group_for_exit() { void restore_term_foreground_process_group_for_exit() {
@ -1407,32 +1389,6 @@ void restore_term_foreground_process_group_for_exit() {
} }
} }
bool is_main_thread() { return thread_id() == 1; }
void assert_is_main_thread(const char *who) {
if (!likely(is_main_thread()) && !unlikely(thread_asserts_cfg_for_testing)) {
FLOGF(error, L"%s called off of main thread.", who);
FLOGF(error, L"Break on debug_thread_error to debug.");
debug_thread_error();
}
}
void assert_is_not_forked_child(const char *who) {
if (unlikely(is_forked_child())) {
FLOGF(error, L"%s called in a forked child.", who);
FLOG(error, L"Break on debug_thread_error to debug.");
debug_thread_error();
}
}
void assert_is_background_thread(const char *who) {
if (unlikely(is_main_thread()) && !unlikely(thread_asserts_cfg_for_testing)) {
FLOGF(error, L"%s called on the main thread (may block!).", who);
FLOG(error, L"Break on debug_thread_error to debug.");
debug_thread_error();
}
}
void assert_is_locked(std::mutex &mutex, const char *who, const char *caller) { void assert_is_locked(std::mutex &mutex, const char *who, const char *caller) {
// Note that std::mutex.try_lock() is allowed to return false when the mutex isn't // Note that std::mutex.try_lock() is allowed to return false when the mutex isn't
// actually locked; fortunately we are checking the opposite so we're safe. // actually locked; fortunately we are checking the opposite so we're safe.

View file

@ -320,14 +320,6 @@ bool should_suppress_stderr_for_tests();
#define likely(x) __builtin_expect(bool(x), 1) #define likely(x) __builtin_expect(bool(x), 1)
#define unlikely(x) __builtin_expect(bool(x), 0) #define unlikely(x) __builtin_expect(bool(x), 0)
void assert_is_main_thread(const char *who);
#define ASSERT_IS_MAIN_THREAD_TRAMPOLINE(x) assert_is_main_thread(x)
#define ASSERT_IS_MAIN_THREAD() ASSERT_IS_MAIN_THREAD_TRAMPOLINE(__FUNCTION__)
void assert_is_background_thread(const char *who);
#define ASSERT_IS_BACKGROUND_THREAD_TRAMPOLINE(x) assert_is_background_thread(x)
#define ASSERT_IS_BACKGROUND_THREAD() ASSERT_IS_BACKGROUND_THREAD_TRAMPOLINE(__FUNCTION__)
/// Useful macro for asserting that a lock is locked. This doesn't check whether this thread locked /// Useful macro for asserting that a lock is locked. This doesn't check whether this thread locked
/// it, which it would be nice if it did, but here it is anyways. /// it, which it would be nice if it did, but here it is anyways.
void assert_is_locked(std::mutex &mutex, const char *who, const char *caller); void assert_is_locked(std::mutex &mutex, const char *who, const char *caller);
@ -538,27 +530,10 @@ wcstring reformat_for_screen(const wcstring &msg, const termsize_t &termsize);
using timepoint_t = double; using timepoint_t = double;
timepoint_t timef(); timepoint_t timef();
/// Call the following function early in main to set the main thread. This is our replacement for
/// pthread_main_np().
void set_main_thread();
bool is_main_thread();
/// Configures thread assertions for testing.
void configure_thread_assertions_for_testing();
/// Set up a guard to complain if we try to do certain things (like take a lock) after calling fork.
void setup_fork_guards(void);
/// Save the value of tcgetpgrp so we can restore it on exit. /// Save the value of tcgetpgrp so we can restore it on exit.
void save_term_foreground_process_group(); void save_term_foreground_process_group();
void restore_term_foreground_process_group_for_exit(); void restore_term_foreground_process_group_for_exit();
/// Return whether we are the child of a fork.
bool is_forked_child(void);
void assert_is_not_forked_child(const char *who);
#define ASSERT_IS_NOT_FORKED_CHILD_TRAMPOLINE(x) assert_is_not_forked_child(x)
#define ASSERT_IS_NOT_FORKED_CHILD() ASSERT_IS_NOT_FORKED_CHILD_TRAMPOLINE(__FUNCTION__)
/// Determines if we are running under Microsoft's Windows Subsystem for Linux to work around /// Determines if we are running under Microsoft's Windows Subsystem for Linux to work around
/// some known limitations and/or bugs. /// some known limitations and/or bugs.
/// See https://github.com/Microsoft/WSL/issues/423 and Microsoft/WSL#2997 /// See https://github.com/Microsoft/WSL/issues/423 and Microsoft/WSL#2997

View file

@ -35,6 +35,7 @@
#include "proc.h" #include "proc.h"
#include "reader.h" #include "reader.h"
#include "termsize.h" #include "termsize.h"
#include "threads.rs.h"
#include "wcstringutil.h" #include "wcstringutil.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep

View file

@ -33,6 +33,7 @@
#include "parse_util.h" #include "parse_util.h"
#include "parser.h" #include "parser.h"
#include "path.h" #include "path.h"
#include "threads.rs.h"
#include "util.h" #include "util.h"
#include "wcstringutil.h" #include "wcstringutil.h"
#include "wildcard.h" #include "wildcard.h"

View file

@ -428,8 +428,6 @@ int main(int argc, char **argv) {
int my_optind = 0; int my_optind = 0;
program_name = L"fish"; program_name = L"fish";
set_main_thread();
setup_fork_guards();
rust_init(); rust_init();
signal_unblock_all(); signal_unblock_all();

View file

@ -288,8 +288,6 @@ static std::string no_colorize(const wcstring &text) { return wcs2zstring(text);
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
program_name = L"fish_indent"; program_name = L"fish_indent";
set_main_thread();
setup_fork_guards();
rust_init(); rust_init();
// Using the user's default locale could be a problem if it doesn't use UTF-8 encoding. That's // Using the user's default locale could be a problem if it doesn't use UTF-8 encoding. That's
// because the fish project assumes Unicode UTF-8 encoding in all of its scripts. // because the fish project assumes Unicode UTF-8 encoding in all of its scripts.

View file

@ -272,8 +272,6 @@ static void process_input(bool continuous_mode, bool verbose) {
/// Setup our environment (e.g., tty modes), process key strokes, then reset the environment. /// Setup our environment (e.g., tty modes), process key strokes, then reset the environment.
[[noreturn]] static void setup_and_process_keys(bool continuous_mode, bool verbose) { [[noreturn]] static void setup_and_process_keys(bool continuous_mode, bool verbose) {
set_interactive_session(true); set_interactive_session(true);
set_main_thread();
setup_fork_guards();
rust_init(); rust_init();
env_init(); env_init();
reader_init(); reader_init();

View file

@ -805,7 +805,7 @@ static void test_debounce() {
say(L"Testing debounce"); say(L"Testing debounce");
// Run 8 functions using a condition variable. // Run 8 functions using a condition variable.
// Only the first and last should run. // Only the first and last should run.
debounce_t db; auto db = new_debounce_t(0);
constexpr size_t count = 8; constexpr size_t count = 8;
std::array<bool, count> handler_ran = {}; std::array<bool, count> handler_ran = {};
std::array<bool, count> completion_ran = {}; std::array<bool, count> completion_ran = {};
@ -817,14 +817,14 @@ static void test_debounce() {
// "Enqueue" all functions. Each one waits until ready_to_go. // "Enqueue" all functions. Each one waits until ready_to_go.
for (size_t idx = 0; idx < count; idx++) { for (size_t idx = 0; idx < count; idx++) {
do_test(handler_ran[idx] == false); do_test(handler_ran[idx] == false);
db.perform( std::function<size_t()> performer = [&, idx] {
[&, idx] {
std::unique_lock<std::mutex> lock(m); std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&] { return ready_to_go; }); cv.wait(lock, [&] { return ready_to_go; });
handler_ran[idx] = true; handler_ran[idx] = true;
return idx; return idx;
}, };
[&](size_t idx) { completion_ran[idx] = true; }); std::function<void(size_t)> completer = [&](size_t idx) { completion_ran[idx] = true; };
debounce_perform_with_completion(*db, std::move(performer), std::move(completer));
} }
// We're ready to go. // We're ready to go.
@ -863,7 +863,7 @@ static void test_debounce_timeout() {
// Use a shared_ptr so we don't have to join our threads. // Use a shared_ptr so we don't have to join our threads.
const long timeout_ms = 500; const long timeout_ms = 500;
struct data_t { struct data_t {
debounce_t db{timeout_ms}; rust::box<debounce_t> db = new_debounce_t(timeout_ms);
bool exit_ok = false; bool exit_ok = false;
std::mutex m; std::mutex m;
std::condition_variable cv; std::condition_variable cv;
@ -879,14 +879,14 @@ static void test_debounce_timeout() {
}; };
// Spawn the handler twice. This should not modify the thread token. // Spawn the handler twice. This should not modify the thread token.
uint64_t token1 = data->db.perform(handler); uint64_t token1 = debounce_perform(*data->db, handler);
uint64_t token2 = data->db.perform(handler); uint64_t token2 = debounce_perform(*data->db, handler);
do_test(token1 == token2); do_test(token1 == token2);
// Wait 75 msec, then enqueue something else; this should spawn a new thread. // Wait 75 msec, then enqueue something else; this should spawn a new thread.
std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms + timeout_ms / 2)); std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms + timeout_ms / 2));
do_test(data->running == 1); do_test(data->running == 1);
uint64_t token3 = data->db.perform(handler); uint64_t token3 = debounce_perform(*data->db, handler);
do_test(token3 > token2); do_test(token3 > token2);
// Release all the threads. // Release all the threads.
@ -6493,8 +6493,6 @@ int main(int argc, char **argv) {
uname(&uname_info); uname(&uname_info);
say(L"Testing low-level functionality"); say(L"Testing low-level functionality");
set_main_thread();
setup_fork_guards();
rust_init(); rust_init();
proc_init(); proc_init();
env_init(); env_init();

View file

@ -36,6 +36,7 @@
#include "parser.h" #include "parser.h"
#include "path.h" #include "path.h"
#include "redirection.h" #include "redirection.h"
#include "threads.rs.h"
#include "tokenizer.h" #include "tokenizer.h"
#include "wcstringutil.h" #include "wcstringutil.h"
#include "wildcard.h" #include "wildcard.h"

View file

@ -29,6 +29,7 @@
#include "proc.h" #include "proc.h"
#include "reader.h" #include "reader.h"
#include "signals.h" // IWYU pragma: keep #include "signals.h" // IWYU pragma: keep
#include "threads.rs.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep
/// A name for our own key mapping for nul. /// A name for our own key mapping for nul.

View file

@ -21,6 +21,7 @@
#include "maybe.h" #include "maybe.h"
#include "path.h" #include "path.h"
#include "redirection.h" #include "redirection.h"
#include "threads.rs.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep
/// File redirection error message. /// File redirection error message.

View file

@ -2,420 +2,16 @@
#include "iothread.h" #include "iothread.h"
#include <pthread.h> extern "C" const void *iothread_trampoline(const void *c) {
#include <signal.h> iothread_callback_t *callback = (iothread_callback_t *)c;
#include <stdio.h> auto *result = (callback->callback)(callback->param);
delete callback;
#include <atomic>
#include <chrono>
#include <condition_variable> // IWYU pragma: keep
#include <functional>
#include <mutex>
#include <queue>
#include <vector>
#include "common.h"
#include "fallback.h"
#include "fd_readable_set.rs.h"
#include "fds.h"
#include "flog.h"
#include "maybe.h"
/// We just define a thread limit of 1024.
#define IO_MAX_THREADS 1024
// iothread has a thread pool. Sometimes there's no work to do, but extant threads wait around for a
// while (on a condition variable) in case new work comes soon. However condition variables are not
// properly instrumented with Thread Sanitizer, so it fails to recognize when our mutex is locked.
// See https://github.com/google/sanitizers/issues/1259
// When using TSan, disable the wait-around feature.
#ifdef FISH_TSAN_WORKAROUNDS
#define IO_WAIT_FOR_WORK_DURATION_MS 0
#else
#define IO_WAIT_FOR_WORK_DURATION_MS 500
#endif
using void_function_t = std::function<void()>;
namespace {
struct work_request_t : noncopyable_t {
void_function_t handler;
explicit work_request_t(void_function_t &&f) : handler(std::move(f)) {}
};
struct thread_pool_t : noncopyable_t, nonmovable_t {
struct data_t {
/// The queue of outstanding, unclaimed requests.
std::queue<work_request_t> request_queue{};
/// The number of threads that exist in the pool.
size_t total_threads{0};
/// The number of threads which are waiting for more work.
size_t waiting_threads{0};
};
/// Data which needs to be atomically accessed.
owning_lock<data_t> req_data{};
/// The condition variable used to wake up waiting threads.
/// Note this is tied to data's lock.
std::condition_variable queue_cond{};
/// The minimum and maximum number of threads.
/// Here "minimum" means threads that are kept waiting in the pool.
/// Note that the pool is initially empty and threads may decide to exit based on a time wait.
const size_t soft_min_threads;
const size_t max_threads;
/// Construct with a soft minimum and maximum thread count.
thread_pool_t(size_t soft_min_threads, size_t max_threads)
: soft_min_threads(soft_min_threads), max_threads(max_threads) {}
/// Enqueue a new work item onto the thread pool.
/// The function \p func will execute in one of the pool's threads.
/// If \p cant_wait is set, disrespect the thread limit, because extant threads may
/// want to wait for new threads.
int perform(void_function_t &&func, bool cant_wait);
private:
/// The worker loop for this thread.
void *run();
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
/// reducing the active thread count.
/// This runs in the background thread.
maybe_t<work_request_t> dequeue_work_or_commit_to_exit();
/// Trampoline function for pthread_spawn compatibility.
static void *run_trampoline(void *vpool);
/// Attempt to spawn a new pthread.
bool spawn() const;
};
/// The thread pool for "iothreads" which are used to lift I/O off of the main thread.
/// These are used for completions, etc.
/// Leaked to avoid shutdown dtor registration (including tsan).
static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS));
/// A queue of "things to do on the main thread."
using main_thread_queue_t = std::vector<void_function_t>;
static owning_lock<main_thread_queue_t> s_main_thread_queue;
/// \return the signaller for completions and main thread requests.
static fd_event_signaller_t &get_notify_signaller() {
// Leaked to avoid shutdown dtors.
static auto s_signaller = new fd_event_signaller_t();
return *s_signaller;
}
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
/// reducing the active thread count.
maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
auto data = this->req_data.acquire();
// If the queue is empty, check to see if we should wait.
// We should wait if our exiting would drop us below the soft min.
if (data->request_queue.empty() && data->total_threads == this->soft_min_threads &&
IO_WAIT_FOR_WORK_DURATION_MS > 0) {
data->waiting_threads += 1;
this->queue_cond.wait_for(data.get_lock(),
std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS));
data->waiting_threads -= 1;
}
// Now that we've perhaps waited, see if there's something on the queue.
maybe_t<work_request_t> result{};
if (!data->request_queue.empty()) {
result = std::move(data->request_queue.front());
data->request_queue.pop();
}
// If we are returning none, then ensure we balance the thread count increment from when we were
// created. This has to be done here in this awkward place because we've already committed to
// exiting - we will never pick up more work. So we need to ensure we decrement the thread count
// while holding the lock as we are effectively exited.
if (!result) {
data->total_threads -= 1;
}
return result; return result;
} }
static intptr_t this_thread() { return (intptr_t)pthread_self(); } extern "C" const void *iothread_trampoline2(const void *c, const void *p) {
iothread_callback_t *callback = (iothread_callback_t *)c;
void *thread_pool_t::run() { auto *result = (callback->callback)(p);
while (auto req = dequeue_work_or_commit_to_exit()) { delete callback;
FLOGF(iothread, L"pthread %p got work", this_thread()); return result;
// Perform the work
req->handler();
} }
FLOGF(iothread, L"pthread %p exiting", this_thread());
return nullptr;
}
void *thread_pool_t::run_trampoline(void *pool) {
assert(pool && "No thread pool given");
return static_cast<thread_pool_t *>(pool)->run();
}
/// Spawn another thread. No lock is held when this is called.
bool thread_pool_t::spawn() const {
return make_detached_pthread(&run_trampoline, const_cast<thread_pool_t *>(this));
}
int thread_pool_t::perform(void_function_t &&func, bool cant_wait) {
assert(func && "Missing function");
// Note we permit an empty completion.
struct work_request_t req(std::move(func));
int local_thread_count = -1;
auto &pool = s_io_thread_pool;
bool spawn_new_thread = false;
bool wakeup_thread = false;
{
// Lock around a local region.
auto data = pool.req_data.acquire();
data->request_queue.push(std::move(req));
FLOGF(iothread, L"enqueuing work item (count is %lu)", data->request_queue.size());
if (data->waiting_threads >= data->request_queue.size()) {
// There's enough waiting threads, wake one up.
wakeup_thread = true;
} else if (cant_wait || data->total_threads < pool.max_threads) {
// No threads are waiting but we can or must spawn a new thread.
data->total_threads += 1;
spawn_new_thread = true;
}
local_thread_count = data->total_threads;
}
// Kick off the thread if we decided to do so.
if (wakeup_thread) {
FLOGF(iothread, L"notifying thread: %p", this_thread());
pool.queue_cond.notify_one();
}
if (spawn_new_thread) {
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
// unlikely that they are all on the verge of exiting, so one is likely to be ready to
// handle extant requests. So we can ignore failure with some confidence.
if (this->spawn()) {
FLOGF(iothread, L"pthread spawned");
} else {
// We failed to spawn a thread; decrement the thread count.
pool.req_data.acquire()->total_threads -= 1;
}
}
return local_thread_count;
}
} // namespace
void iothread_perform_impl(void_function_t &&func, bool cant_wait) {
ASSERT_IS_NOT_FORKED_CHILD();
s_io_thread_pool.perform(std::move(func), cant_wait);
}
int iothread_port() { return get_notify_signaller().read_fd(); }
void iothread_service_main_with_timeout(uint64_t timeout_usec) {
if (is_fd_readable(iothread_port(), timeout_usec)) {
iothread_service_main();
}
}
/// At the moment, this function is only used in the test suite.
void iothread_drain_all() {
// Nasty polling via select().
while (s_io_thread_pool.req_data.acquire()->total_threads > 0) {
iothread_service_main_with_timeout(1000);
}
}
// Service the main thread queue, by invoking any functions enqueued for the main thread.
void iothread_service_main() {
ASSERT_IS_MAIN_THREAD();
// Note the order here is important: we must consume events before handling requests, as posting
// uses the opposite order.
(void)get_notify_signaller().try_consume();
// Move the queue to a local variable.
// Note the s_main_thread_queue lock is not held after this.
main_thread_queue_t queue;
s_main_thread_queue.acquire()->swap(queue);
// Perform each completion in order.
for (const void_function_t &func : queue) {
// ensure we don't invoke empty functions, that raises an exception
if (func) func();
}
}
bool make_detached_pthread(void *(*func)(void *), void *param) {
// The spawned thread inherits our signal mask. Temporarily block signals, spawn the thread, and
// then restore it. But we must not block SIGBUS, SIGFPE, SIGILL, or SIGSEGV; that's undefined
// (#7837). Conservatively don't try to mask SIGKILL or SIGSTOP either; that's ignored on Linux
// but maybe has an effect elsewhere.
sigset_t new_set, saved_set;
sigfillset(&new_set);
sigdelset(&new_set, SIGILL); // bad jump
sigdelset(&new_set, SIGFPE); // divide by zero
sigdelset(&new_set, SIGBUS); // unaligned memory access
sigdelset(&new_set, SIGSEGV); // bad memory access
sigdelset(&new_set, SIGSTOP); // unblockable
sigdelset(&new_set, SIGKILL); // unblockable
DIE_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set));
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
// unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
// extant requests. So we can ignore failure with some confidence.
pthread_t thread;
pthread_attr_t thread_attr;
DIE_ON_FAILURE(pthread_attr_init(&thread_attr));
int err = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
if (err == 0) {
err = pthread_create(&thread, &thread_attr, func, param);
if (err == 0) {
FLOGF(iothread, "pthread %d spawned", thread);
} else {
perror("pthread_create");
}
int err2 = pthread_attr_destroy(&thread_attr);
if (err2 != 0) {
perror("pthread_attr_destroy");
err = err2;
}
} else {
perror("pthread_attr_setdetachstate");
}
// Restore our sigmask.
DIE_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, nullptr));
return err == 0;
}
using void_func_t = std::function<void(void)>;
static void *func_invoker(void *param) {
// Acquire a thread id for this thread.
(void)thread_id();
auto vf = static_cast<void_func_t *>(param);
(*vf)();
delete vf;
return nullptr;
}
bool make_detached_pthread(void_func_t &&func) {
// Copy the function into a heap allocation.
auto vf = new void_func_t(std::move(func));
if (make_detached_pthread(func_invoker, vf)) {
return true;
}
// Thread spawning failed, clean up our heap allocation.
delete vf;
return false;
}
static uint64_t next_thread_id() {
// Note 0 is an invalid thread id.
// Note fetch_add is a CAS which returns the value *before* the modification.
static std::atomic<uint64_t> s_last_thread_id{};
uint64_t res = 1 + s_last_thread_id.fetch_add(1, std::memory_order_relaxed);
return res;
}
uint64_t thread_id() {
static FISH_THREAD_LOCAL uint64_t tl_tid = next_thread_id();
return tl_tid;
}
// Debounce implementation note: we would like to enqueue at most one request, except if a thread
// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called
// "abandoned". This is implemented via a monotone uint64 counter, called a token.
// Every time we spawn a thread, increment the token. When the thread is completed, it compares its
// token to the active token; if they differ then this thread was abandoned.
struct debounce_t::impl_t {
// Synchronized data from debounce_t.
struct data_t {
// The (at most 1) next enqueued request, or none if none.
maybe_t<work_request_t> next_req{};
// The token of the current non-abandoned thread, or 0 if no thread is running.
uint64_t active_token{0};
// The next token to use when spawning a thread.
uint64_t next_token{1};
// The start time of the most recently run thread spawn, or request (if any).
std::chrono::time_point<std::chrono::steady_clock> start_time{};
};
owning_lock<data_t> data{};
/// Run an iteration in the background, with the given thread token.
/// \return true if we handled a request, false if there were none.
bool run_next(uint64_t token);
};
bool debounce_t::impl_t::run_next(uint64_t token) {
assert(token > 0 && "Invalid token");
// Note we are on a background thread.
maybe_t<work_request_t> req;
{
auto d = data.acquire();
if (d->next_req) {
// The value was dequeued, we are going to execute it.
req = d->next_req.acquire();
d->start_time = std::chrono::steady_clock::now();
} else {
// There is no request. If we are active, mark ourselves as no longer running.
if (token == d->active_token) {
d->active_token = 0;
}
return false;
}
}
assert(req && req->handler && "Request should have value");
req->handler();
return true;
}
uint64_t debounce_t::perform(std::function<void()> handler) {
uint64_t active_token{0};
bool spawn{false};
// Local lock.
{
auto d = impl_->data.acquire();
d->next_req = work_request_t{std::move(handler)};
// If we have a timeout, and our running thread has exceeded it, abandon that thread.
if (d->active_token && timeout_msec_ > 0 &&
std::chrono::steady_clock::now() - d->start_time >
std::chrono::milliseconds(timeout_msec_)) {
// Abandon this thread by marking nothing as active.
d->active_token = 0;
}
if (!d->active_token) {
// We need to spawn a new thread.
// Mark the current time so that a new request won't immediately abandon us.
spawn = true;
d->active_token = d->next_token++;
d->start_time = std::chrono::steady_clock::now();
}
active_token = d->active_token;
assert(active_token && "Something should be active");
}
if (spawn) {
// Equip our background thread with a reference to impl, to keep it alive.
auto impl = impl_;
iothread_perform([=] {
while (impl->run_next(active_token))
; // pass
});
}
return active_token;
}
// static
void debounce_t::enqueue_main_thread_result(std::function<void()> func) {
s_main_thread_queue.acquire()->push_back(std::move(func));
get_notify_signaller().post();
}
debounce_t::debounce_t(long timeout_msec)
: timeout_msec_(timeout_msec), impl_(std::make_shared<impl_t>()) {}
debounce_t::~debounce_t() = default;

View file

@ -1,90 +1,123 @@
// Handles IO that may hang. // Handles IO that may hang.
#ifndef FISH_IOTHREAD_H #ifndef FISH_IOTHREAD_H
#define FISH_IOTHREAD_H #define FISH_IOTHREAD_H
#if INCLUDE_RUST_HEADERS
#include <cstdint> // for uint64_t #include <cstdlib>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <utility> #include <utility>
/// \return the fd on which to listen for completion callbacks. #include "threads.rs.h"
int iothread_port();
/// Services iothread main thread completions and requests. struct iothread_callback_t {
/// This does not block. std::function<void *(const void *param)> callback;
void iothread_service_main(); void *param;
// Services any main thread requests. Does not wait more than \p timeout_usec. ~iothread_callback_t() {
void iothread_service_main_with_timeout(uint64_t timeout_usec); if (param) {
free(param);
param = nullptr;
}
}
};
/// Waits for all iothreads to terminate. extern "C" const void *iothread_trampoline(const void *callback);
/// This is a hacky function only used in the test suite. extern "C" const void *iothread_trampoline2(const void *callback, const void *param);
void iothread_drain_all();
// Internal implementation
void iothread_perform_impl(std::function<void()> &&, bool cant_wait = false);
// iothread_perform invokes a handler on a background thread. // iothread_perform invokes a handler on a background thread.
inline void iothread_perform(std::function<void()> &&func) { inline void iothread_perform(std::function<void()> &&func) {
iothread_perform_impl(std::move(func)); auto callback = new iothread_callback_t{std::bind([=] {
func();
return nullptr;
}),
nullptr};
iothread_perform((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
} }
/// Variant of iothread_perform that disrespects the thread limit. /// Variant of iothread_perform that disrespects the thread limit.
/// It does its best to spawn a new thread if all other threads are occupied. /// It does its best to spawn a new thread if all other threads are occupied.
/// This is for cases where deferring a new thread might lead to deadlock. /// This is for cases where deferring a new thread might lead to deadlock.
inline void iothread_perform_cantwait(std::function<void()> &&func) { inline void iothread_perform_cantwait(std::function<void()> &&func) {
iothread_perform_impl(std::move(func), true); auto callback = new iothread_callback_t{std::bind([=] {
func();
return nullptr;
}),
nullptr};
iothread_perform_cantwait((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
} }
/// Creates a pthread, manipulating the signal mask so that the thread receives no signals. inline uint64_t debounce_perform(const debounce_t &debouncer, const std::function<void()> &func) {
/// The thread is detached. auto callback = new iothread_callback_t{std::bind([=] {
/// The pthread runs \p func. func();
/// \returns true on success, false on failure. return nullptr;
bool make_detached_pthread(void *(*func)(void *), void *param); }),
bool make_detached_pthread(std::function<void()> &&func); nullptr};
/// \returns a thread ID for this thread. return debouncer.perform((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
/// Thread IDs are never repeated.
uint64_t thread_id();
/// A Debounce is a simple class which executes one function in a background thread,
/// while enqueuing at most one more. New execution requests overwrite the enqueued one.
/// It has an optional timeout; if a handler does not finish within the timeout, then
/// a new thread is spawned.
class debounce_t {
public:
/// Enqueue \p handler to be performed on a background thread, and \p completion (if any) to be
/// performed on the main thread. If a function is already enqueued, this overwrites it; that
/// function will not execute.
/// If the function executes, then \p completion will be invoked on the main thread, with the
/// result of the handler.
/// The result is a token which is only of interest to the tests.
template <typename Handler, typename Completion>
uint64_t perform(const Handler &handler, const Completion &completion) {
// Make a trampoline function which calls the handler, puts the result into a shared
// pointer, and then enqueues a completion.
auto trampoline = [=] {
using result_type_t = decltype(handler());
auto result = std::make_shared<result_type_t>(handler());
enqueue_main_thread_result([=] { completion(std::move(*result)); });
};
return perform(std::move(trampoline));
} }
/// One-argument form with no completion. template <typename R>
/// The result is a token which is only of interest to the tests. inline void debounce_perform_with_completion(const debounce_t &debouncer, std::function<R()> &&func,
uint64_t perform(std::function<void()> handler); std::function<void(R)> &&completion) {
auto callback1 = new iothread_callback_t{[=](const void *) {
auto *result = new R(func());
return (void *)result;
},
nullptr};
explicit debounce_t(long timeout_msec = 0); auto callback2 = new iothread_callback_t{
~debounce_t(); ([=](const void *r) {
const R *result = (const R *)r;
private: completion(*result);
/// Helper to enqueue a function to run on the main thread. delete result;
static void enqueue_main_thread_result(std::function<void()> func); return nullptr;
}),
const long timeout_msec_; nullptr,
struct impl_t;
const std::shared_ptr<impl_t> impl_;
}; };
debouncer.perform_with_completion(
(const uint8_t *)&iothread_trampoline, (const uint8_t *)callback1,
(const uint8_t *)&iothread_trampoline2, (const uint8_t *)callback2);
}
template <typename R>
inline void debounce_perform_with_completion(const debounce_t &debouncer, std::function<R()> &&func,
std::function<void(const R &)> &&completion) {
auto callback1 = new iothread_callback_t{[=](const void *) {
auto *result = new R(func());
return (void *)result;
},
nullptr};
auto callback2 = new iothread_callback_t{
([=](const void *r) {
const R *result = (const R *)r;
completion(*result);
delete result;
return nullptr;
}),
nullptr,
};
debouncer.perform_with_completion(
(const uint8_t *)&iothread_trampoline, (const uint8_t *)callback1,
(const uint8_t *)&iothread_trampoline2, (const uint8_t *)callback2);
}
inline bool make_detached_pthread(const std::function<void()> &func) {
auto callback = new iothread_callback_t{
[=](const void *) {
func();
return nullptr;
},
nullptr,
};
return make_detached_pthread((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
}
#endif
#endif #endif

View file

@ -29,6 +29,7 @@
#include "flog.h" #include "flog.h"
#include "maybe.h" #include "maybe.h"
#include "output.h" #include "output.h"
#include "threads.rs.h"
#include "wcstringutil.h" #include "wcstringutil.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep

View file

@ -30,6 +30,7 @@
#include "parse_execution.h" #include "parse_execution.h"
#include "proc.h" #include "proc.h"
#include "signals.h" #include "signals.h"
#include "threads.rs.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep
class io_chain_t; class io_chain_t;

View file

@ -177,19 +177,19 @@ static constexpr long kHighlightTimeoutForExecutionMs = 250;
/// These are deliberately leaked to avoid shutdown dtor registration. /// These are deliberately leaked to avoid shutdown dtor registration.
static debounce_t &debounce_autosuggestions() { static debounce_t &debounce_autosuggestions() {
const long kAutosuggestTimeoutMs = 500; const long kAutosuggestTimeoutMs = 500;
static auto res = new debounce_t(kAutosuggestTimeoutMs); static auto res = new_debounce_t(kAutosuggestTimeoutMs);
return *res; return *res;
} }
static debounce_t &debounce_highlighting() { static debounce_t &debounce_highlighting() {
const long kHighlightTimeoutMs = 500; const long kHighlightTimeoutMs = 500;
static auto res = new debounce_t(kHighlightTimeoutMs); static auto res = new_debounce_t(kHighlightTimeoutMs);
return *res; return *res;
} }
static debounce_t &debounce_history_pager() { static debounce_t &debounce_history_pager() {
const long kHistoryPagerTimeoutMs = 500; const long kHistoryPagerTimeoutMs = 500;
static auto res = new debounce_t(kHistoryPagerTimeoutMs); static auto res = new_debounce_t(kHistoryPagerTimeoutMs);
return *res; return *res;
} }
@ -1333,8 +1333,10 @@ void reader_data_t::fill_history_pager(bool new_search, history_search_direction
} }
const wcstring &search_term = pager.search_field_line.text(); const wcstring &search_term = pager.search_field_line.text();
auto shared_this = this->shared_from_this(); auto shared_this = this->shared_from_this();
debounce_history_pager().perform( std::function<history_pager_result_t()> func = [=]() {
[=]() { return history_pager_search(shared_this->history, direction, index, search_term); }, return history_pager_search(shared_this->history, direction, index, search_term);
};
std::function<void(const history_pager_result_t &)> completion =
[=](const history_pager_result_t &result) { [=](const history_pager_result_t &result) {
if (search_term != shared_this->pager.search_field_line.text()) if (search_term != shared_this->pager.search_field_line.text())
return; // Stale request. return; // Stale request.
@ -1356,7 +1358,9 @@ void reader_data_t::fill_history_pager(bool new_search, history_search_direction
shared_this->select_completion_in_direction(selection_motion_t::next, true); shared_this->select_completion_in_direction(selection_motion_t::next, true);
shared_this->super_highlight_me_plenty(); shared_this->super_highlight_me_plenty();
shared_this->layout_and_repaint(L"history-pager"); shared_this->layout_and_repaint(L"history-pager");
}); };
auto &debouncer = debounce_history_pager();
debounce_perform_with_completion(debouncer, std::move(func), std::move(completion));
} }
void reader_data_t::pager_selection_changed() { void reader_data_t::pager_selection_changed() {
@ -2107,11 +2111,14 @@ void reader_data_t::update_autosuggestion() {
// Clear the autosuggestion and kick it off in the background. // Clear the autosuggestion and kick it off in the background.
FLOG(reader_render, L"Autosuggesting"); FLOG(reader_render, L"Autosuggesting");
autosuggestion.clear(); autosuggestion.clear();
auto performer = get_autosuggestion_performer(parser(), el.text(), el.position(), history); std::function<autosuggestion_t()> performer =
get_autosuggestion_performer(parser(), el.text(), el.position(), history);
auto shared_this = this->shared_from_this(); auto shared_this = this->shared_from_this();
debounce_autosuggestions().perform(performer, [shared_this](autosuggestion_t result) { std::function<void(autosuggestion_t)> completion = [shared_this](autosuggestion_t result) {
shared_this->autosuggest_completed(std::move(result)); shared_this->autosuggest_completed(std::move(result));
}); };
debounce_perform_with_completion(debounce_autosuggestions(), std::move(performer),
std::move(completion));
} }
// Accept any autosuggestion by replacing the command line with it. If full is true, take the whole // Accept any autosuggestion by replacing the command line with it. If full is true, take the whole
@ -2827,11 +2834,14 @@ void reader_data_t::super_highlight_me_plenty() {
in_flight_highlight_request = el->text(); in_flight_highlight_request = el->text();
FLOG(reader_render, L"Highlighting"); FLOG(reader_render, L"Highlighting");
auto highlight_performer = get_highlight_performer(parser(), *el, true /* io_ok */); std::function<highlight_result_t()> highlight_performer =
get_highlight_performer(parser(), *el, true /* io_ok */);
auto shared_this = this->shared_from_this(); auto shared_this = this->shared_from_this();
debounce_highlighting().perform(highlight_performer, [shared_this](highlight_result_t result) { std::function<void(highlight_result_t)> completion = [shared_this](highlight_result_t result) {
shared_this->highlight_complete(std::move(result)); shared_this->highlight_complete(std::move(result));
}); };
debounce_perform_with_completion(debounce_highlighting(), std::move(highlight_performer),
std::move(completion));
} }
void reader_data_t::finish_highlighting_before_exec() { void reader_data_t::finish_highlighting_before_exec() {