diff --git a/fish-rust/build.rs b/fish-rust/build.rs index f0f80dc26..02d289433 100644 --- a/fish-rust/build.rs +++ b/fish-rust/build.rs @@ -47,6 +47,7 @@ fn main() -> miette::Result<()> { "src/timer.rs", "src/tokenizer.rs", "src/topic_monitor.rs", + "src/threads.rs", "src/trace.rs", "src/util.rs", "src/wait_handle.rs", diff --git a/fish-rust/src/threads.rs b/fish-rust/src/threads.rs index 0107a4de6..7fa0fed43 100644 --- a/fish-rust/src/threads.rs +++ b/fish-rust/src/threads.rs @@ -49,6 +49,86 @@ static NOTIFY_SIGNALLER: once_cell::sync::Lazy<&'static crate::fd_monitor::FdEve result }); +#[cxx::bridge] +mod ffi { + extern "Rust" { + #[cxx_name = "ASSERT_IS_MAIN_THREAD"] + fn assert_is_main_thread(); + #[cxx_name = "ASSERT_IS_BACKGROUND_THREAD"] + fn assert_is_background_thread(); + #[cxx_name = "ASSERT_IS_NOT_FORKED_CHILD"] + fn assert_is_not_forked_child(); + fn configure_thread_assertions_for_testing(); + fn is_main_thread() -> bool; + fn is_forked_child() -> bool; + } + + extern "Rust" { + #[cxx_name = "make_detached_pthread"] + fn spawn_ffi(callback: *const u8, param: *const u8) -> bool; + } + + extern "Rust" { + fn iothread_port() -> i32; + fn iothread_service_main(); + #[cxx_name = "iothread_service_main_with_timeout"] + fn iothread_service_main_with_timeout_ffi(timeout_usec: u64); + #[cxx_name = "iothread_drain_all"] + fn iothread_drain_all_ffi(); + #[cxx_name = "iothread_perform"] + fn iothread_perform_ffi(callback: *const u8, param: *const u8); + #[cxx_name = "iothread_perform_cantwait"] + fn iothread_perform_cant_wait_ffi(callback: *const u8, param: *const u8); + } + + extern "Rust" { + #[cxx_name = "debounce_t"] + type Debounce; + + #[cxx_name = "perform"] + fn perform_ffi(&self, callback: *const u8, param: *const u8) -> u64; + #[cxx_name = "perform_with_completion"] + fn perform_with_completion_ffi( + &self, + callback: *const u8, + param1: *const u8, + completion: *const u8, + param2: *const u8, + ) -> u64; + + #[cxx_name = "new_debounce_t"] + fn new_debounce_ffi(timeout_ms: u64) -> Box; + } +} + +fn iothread_service_main_with_timeout_ffi(timeout_usec: u64) { + iothread_service_main_with_timeout(Duration::from_micros(timeout_usec)) +} + +fn iothread_drain_all_ffi() { + unsafe { iothread_drain_all() } +} + +fn iothread_perform_ffi(callback: *const u8, param: *const u8) { + type Callback = extern "C" fn(crate::ffi::void_ptr); + let callback: Callback = unsafe { std::mem::transmute(callback) }; + let param = param.into(); + + iothread_perform(move || { + callback(param); + }); +} + +fn iothread_perform_cant_wait_ffi(callback: *const u8, param: *const u8) { + type Callback = extern "C" fn(crate::ffi::void_ptr); + let callback: Callback = unsafe { std::mem::transmute(callback) }; + let param = param.into(); + + iothread_perform_cant_wait(move || { + callback(param); + }); +} + /// A [`ThreadPool`] or [`Debounce`] work request. type WorkItem = Box; @@ -131,6 +211,18 @@ pub fn is_forked_child() -> bool { IS_FORKED_PROC.load(Ordering::Relaxed) } +#[inline(always)] +pub fn assert_is_not_forked_child() { + #[cold] + fn panic_is_forked_child() { + panic!("Function called from forked child!"); + } + + if is_forked_child() { + panic_is_forked_child(); + } +} + /// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a /// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe /// an unsafe version that doesn't do any lifetime checking akin to @@ -194,6 +286,16 @@ pub fn spawn(callback: F) -> bool { result } +fn spawn_ffi(callback: *const u8, param: *const u8) -> bool { + type Callback = extern "C" fn(crate::ffi::void_ptr); + let callback: Callback = unsafe { std::mem::transmute(callback) }; + let param = param.into(); + + spawn(move || { + callback(param); + }) +} + /// Data shared between the thread pool [`ThreadPool`] and worker threads [`WorkerThread`]. #[derive(Default)] struct ThreadPoolProtected { @@ -422,11 +524,12 @@ pub fn iothread_perform_cant_wait(f: impl FnOnce() + 'static + Send) { thread_pool.perform(f, true); } +pub fn iothread_port() -> i32 { + i32::from(NOTIFY_SIGNALLER.read_fd()) +} + pub fn iothread_service_main_with_timeout(timeout: Duration) { - if crate::fd_readable_set::is_fd_readable( - i32::from(NOTIFY_SIGNALLER.read_fd()), - timeout.as_millis() as u64, - ) { + if crate::fd_readable_set::is_fd_readable(iothread_port(), timeout.as_millis() as u64) { iothread_service_main(); } } @@ -491,6 +594,10 @@ struct DebounceData { start_time: Instant, } +fn new_debounce_ffi(timeout_ms: u64) -> Box { + Box::new(Debounce::new(Duration::from_millis(timeout_ms))) +} + impl Debounce { pub fn new(timeout: Duration) -> Self { Self { @@ -538,6 +645,41 @@ impl Debounce { self.perform_inner(h) } + fn perform_with_completion_ffi( + &self, + callback: *const u8, + param1: *const u8, + completion_callback: *const u8, + param2: *const u8, + ) -> u64 { + type Callback = extern "C" fn(crate::ffi::void_ptr) -> crate::ffi::void_ptr; + type CompletionCallback = extern "C" fn(crate::ffi::void_ptr, crate::ffi::void_ptr); + + let callback: Callback = unsafe { std::mem::transmute(callback) }; + let param1 = param1.into(); + let completion_callback: CompletionCallback = + unsafe { std::mem::transmute(completion_callback) }; + let param2 = param2.into(); + + self.perform_with_completion( + move || callback(param1), + move |result| completion_callback(param2, result), + ) + .into() + } + + fn perform_ffi(&self, callback: *const u8, param: *const u8) -> u64 { + type Callback = extern "C" fn(crate::ffi::void_ptr); + + let callback: Callback = unsafe { std::mem::transmute(callback) }; + let param = param.into(); + + self.perform(move || { + callback(param); + }) + .into() + } + /// Enqueue `handler` to be performed on a background thread with [`Completion`] `completion` /// to be performed on the main thread. If a function is already enqueued, this overwrites it /// and that function will not be executed. diff --git a/src/common.cpp b/src/common.cpp index 54ca4b9c4..ffc0b2e23 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -1373,24 +1373,6 @@ extern "C" { } } -void set_main_thread() { - // Just call thread_id() once to force increment of thread_id. - uint64_t tid = thread_id(); - assert(tid == 1 && "main thread should have thread ID 1"); - (void)tid; -} - -void configure_thread_assertions_for_testing() { thread_asserts_cfg_for_testing = true; } - -bool is_forked_child() { return is_forked_proc; } - -void setup_fork_guards() { - is_forked_proc = false; - static std::once_flag fork_guard_flag; - std::call_once(fork_guard_flag, - [] { pthread_atfork(nullptr, nullptr, [] { is_forked_proc = true; }); }); -} - void save_term_foreground_process_group() { initial_fg_process_group = tcgetpgrp(STDIN_FILENO); } void restore_term_foreground_process_group_for_exit() { @@ -1407,32 +1389,6 @@ void restore_term_foreground_process_group_for_exit() { } } -bool is_main_thread() { return thread_id() == 1; } - -void assert_is_main_thread(const char *who) { - if (!likely(is_main_thread()) && !unlikely(thread_asserts_cfg_for_testing)) { - FLOGF(error, L"%s called off of main thread.", who); - FLOGF(error, L"Break on debug_thread_error to debug."); - debug_thread_error(); - } -} - -void assert_is_not_forked_child(const char *who) { - if (unlikely(is_forked_child())) { - FLOGF(error, L"%s called in a forked child.", who); - FLOG(error, L"Break on debug_thread_error to debug."); - debug_thread_error(); - } -} - -void assert_is_background_thread(const char *who) { - if (unlikely(is_main_thread()) && !unlikely(thread_asserts_cfg_for_testing)) { - FLOGF(error, L"%s called on the main thread (may block!).", who); - FLOG(error, L"Break on debug_thread_error to debug."); - debug_thread_error(); - } -} - void assert_is_locked(std::mutex &mutex, const char *who, const char *caller) { // Note that std::mutex.try_lock() is allowed to return false when the mutex isn't // actually locked; fortunately we are checking the opposite so we're safe. diff --git a/src/common.h b/src/common.h index 4fec83f2e..e2c6f2713 100644 --- a/src/common.h +++ b/src/common.h @@ -320,14 +320,6 @@ bool should_suppress_stderr_for_tests(); #define likely(x) __builtin_expect(bool(x), 1) #define unlikely(x) __builtin_expect(bool(x), 0) -void assert_is_main_thread(const char *who); -#define ASSERT_IS_MAIN_THREAD_TRAMPOLINE(x) assert_is_main_thread(x) -#define ASSERT_IS_MAIN_THREAD() ASSERT_IS_MAIN_THREAD_TRAMPOLINE(__FUNCTION__) - -void assert_is_background_thread(const char *who); -#define ASSERT_IS_BACKGROUND_THREAD_TRAMPOLINE(x) assert_is_background_thread(x) -#define ASSERT_IS_BACKGROUND_THREAD() ASSERT_IS_BACKGROUND_THREAD_TRAMPOLINE(__FUNCTION__) - /// Useful macro for asserting that a lock is locked. This doesn't check whether this thread locked /// it, which it would be nice if it did, but here it is anyways. void assert_is_locked(std::mutex &mutex, const char *who, const char *caller); @@ -538,27 +530,10 @@ wcstring reformat_for_screen(const wcstring &msg, const termsize_t &termsize); using timepoint_t = double; timepoint_t timef(); -/// Call the following function early in main to set the main thread. This is our replacement for -/// pthread_main_np(). -void set_main_thread(); -bool is_main_thread(); - -/// Configures thread assertions for testing. -void configure_thread_assertions_for_testing(); - -/// Set up a guard to complain if we try to do certain things (like take a lock) after calling fork. -void setup_fork_guards(void); - /// Save the value of tcgetpgrp so we can restore it on exit. void save_term_foreground_process_group(); void restore_term_foreground_process_group_for_exit(); -/// Return whether we are the child of a fork. -bool is_forked_child(void); -void assert_is_not_forked_child(const char *who); -#define ASSERT_IS_NOT_FORKED_CHILD_TRAMPOLINE(x) assert_is_not_forked_child(x) -#define ASSERT_IS_NOT_FORKED_CHILD() ASSERT_IS_NOT_FORKED_CHILD_TRAMPOLINE(__FUNCTION__) - /// Determines if we are running under Microsoft's Windows Subsystem for Linux to work around /// some known limitations and/or bugs. /// See https://github.com/Microsoft/WSL/issues/423 and Microsoft/WSL#2997 diff --git a/src/env.cpp b/src/env.cpp index 363cd3c00..ffe631fb8 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -35,6 +35,7 @@ #include "proc.h" #include "reader.h" #include "termsize.h" +#include "threads.rs.h" #include "wcstringutil.h" #include "wutil.h" // IWYU pragma: keep diff --git a/src/expand.cpp b/src/expand.cpp index 06e98b978..636cf931b 100644 --- a/src/expand.cpp +++ b/src/expand.cpp @@ -33,6 +33,7 @@ #include "parse_util.h" #include "parser.h" #include "path.h" +#include "threads.rs.h" #include "util.h" #include "wcstringutil.h" #include "wildcard.h" diff --git a/src/fish.cpp b/src/fish.cpp index 637a17510..0176985d7 100644 --- a/src/fish.cpp +++ b/src/fish.cpp @@ -428,8 +428,6 @@ int main(int argc, char **argv) { int my_optind = 0; program_name = L"fish"; - set_main_thread(); - setup_fork_guards(); rust_init(); signal_unblock_all(); diff --git a/src/fish_indent.cpp b/src/fish_indent.cpp index 4867ae488..dd46abddb 100644 --- a/src/fish_indent.cpp +++ b/src/fish_indent.cpp @@ -288,8 +288,6 @@ static std::string no_colorize(const wcstring &text) { return wcs2zstring(text); int main(int argc, char *argv[]) { program_name = L"fish_indent"; - set_main_thread(); - setup_fork_guards(); rust_init(); // Using the user's default locale could be a problem if it doesn't use UTF-8 encoding. That's // because the fish project assumes Unicode UTF-8 encoding in all of its scripts. diff --git a/src/fish_key_reader.cpp b/src/fish_key_reader.cpp index e70997814..186cb85a0 100644 --- a/src/fish_key_reader.cpp +++ b/src/fish_key_reader.cpp @@ -272,8 +272,6 @@ static void process_input(bool continuous_mode, bool verbose) { /// Setup our environment (e.g., tty modes), process key strokes, then reset the environment. [[noreturn]] static void setup_and_process_keys(bool continuous_mode, bool verbose) { set_interactive_session(true); - set_main_thread(); - setup_fork_guards(); rust_init(); env_init(); reader_init(); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index f4f3179bd..0ba5c6c70 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -805,7 +805,7 @@ static void test_debounce() { say(L"Testing debounce"); // Run 8 functions using a condition variable. // Only the first and last should run. - debounce_t db; + auto db = new_debounce_t(0); constexpr size_t count = 8; std::array handler_ran = {}; std::array completion_ran = {}; @@ -817,14 +817,14 @@ static void test_debounce() { // "Enqueue" all functions. Each one waits until ready_to_go. for (size_t idx = 0; idx < count; idx++) { do_test(handler_ran[idx] == false); - db.perform( - [&, idx] { - std::unique_lock lock(m); - cv.wait(lock, [&] { return ready_to_go; }); - handler_ran[idx] = true; - return idx; - }, - [&](size_t idx) { completion_ran[idx] = true; }); + std::function performer = [&, idx] { + std::unique_lock lock(m); + cv.wait(lock, [&] { return ready_to_go; }); + handler_ran[idx] = true; + return idx; + }; + std::function completer = [&](size_t idx) { completion_ran[idx] = true; }; + debounce_perform_with_completion(*db, std::move(performer), std::move(completer)); } // We're ready to go. @@ -863,7 +863,7 @@ static void test_debounce_timeout() { // Use a shared_ptr so we don't have to join our threads. const long timeout_ms = 500; struct data_t { - debounce_t db{timeout_ms}; + rust::box db = new_debounce_t(timeout_ms); bool exit_ok = false; std::mutex m; std::condition_variable cv; @@ -879,14 +879,14 @@ static void test_debounce_timeout() { }; // Spawn the handler twice. This should not modify the thread token. - uint64_t token1 = data->db.perform(handler); - uint64_t token2 = data->db.perform(handler); + uint64_t token1 = debounce_perform(*data->db, handler); + uint64_t token2 = debounce_perform(*data->db, handler); do_test(token1 == token2); // Wait 75 msec, then enqueue something else; this should spawn a new thread. std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms + timeout_ms / 2)); do_test(data->running == 1); - uint64_t token3 = data->db.perform(handler); + uint64_t token3 = debounce_perform(*data->db, handler); do_test(token3 > token2); // Release all the threads. @@ -6493,8 +6493,6 @@ int main(int argc, char **argv) { uname(&uname_info); say(L"Testing low-level functionality"); - set_main_thread(); - setup_fork_guards(); rust_init(); proc_init(); env_init(); diff --git a/src/highlight.cpp b/src/highlight.cpp index 91cdfe99b..1d805e283 100644 --- a/src/highlight.cpp +++ b/src/highlight.cpp @@ -36,6 +36,7 @@ #include "parser.h" #include "path.h" #include "redirection.h" +#include "threads.rs.h" #include "tokenizer.h" #include "wcstringutil.h" #include "wildcard.h" diff --git a/src/input.cpp b/src/input.cpp index d610d0dd8..5af8202ba 100644 --- a/src/input.cpp +++ b/src/input.cpp @@ -29,7 +29,8 @@ #include "proc.h" #include "reader.h" #include "signals.h" // IWYU pragma: keep -#include "wutil.h" // IWYU pragma: keep +#include "threads.rs.h" +#include "wutil.h" // IWYU pragma: keep /// A name for our own key mapping for nul. static const wchar_t *k_nul_mapping_name = L"nul"; diff --git a/src/io.cpp b/src/io.cpp index 2cbf32197..061e9fe52 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -21,6 +21,7 @@ #include "maybe.h" #include "path.h" #include "redirection.h" +#include "threads.rs.h" #include "wutil.h" // IWYU pragma: keep /// File redirection error message. diff --git a/src/iothread.cpp b/src/iothread.cpp index 62fd6c6e4..0d12dd700 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -2,420 +2,16 @@ #include "iothread.h" -#include -#include -#include - -#include -#include -#include // IWYU pragma: keep -#include -#include -#include -#include - -#include "common.h" -#include "fallback.h" -#include "fd_readable_set.rs.h" -#include "fds.h" -#include "flog.h" -#include "maybe.h" - -/// We just define a thread limit of 1024. -#define IO_MAX_THREADS 1024 - -// iothread has a thread pool. Sometimes there's no work to do, but extant threads wait around for a -// while (on a condition variable) in case new work comes soon. However condition variables are not -// properly instrumented with Thread Sanitizer, so it fails to recognize when our mutex is locked. -// See https://github.com/google/sanitizers/issues/1259 -// When using TSan, disable the wait-around feature. -#ifdef FISH_TSAN_WORKAROUNDS -#define IO_WAIT_FOR_WORK_DURATION_MS 0 -#else -#define IO_WAIT_FOR_WORK_DURATION_MS 500 -#endif - -using void_function_t = std::function; - -namespace { -struct work_request_t : noncopyable_t { - void_function_t handler; - explicit work_request_t(void_function_t &&f) : handler(std::move(f)) {} -}; - -struct thread_pool_t : noncopyable_t, nonmovable_t { - struct data_t { - /// The queue of outstanding, unclaimed requests. - std::queue request_queue{}; - - /// The number of threads that exist in the pool. - size_t total_threads{0}; - - /// The number of threads which are waiting for more work. - size_t waiting_threads{0}; - }; - - /// Data which needs to be atomically accessed. - owning_lock req_data{}; - - /// The condition variable used to wake up waiting threads. - /// Note this is tied to data's lock. - std::condition_variable queue_cond{}; - - /// The minimum and maximum number of threads. - /// Here "minimum" means threads that are kept waiting in the pool. - /// Note that the pool is initially empty and threads may decide to exit based on a time wait. - const size_t soft_min_threads; - const size_t max_threads; - - /// Construct with a soft minimum and maximum thread count. - thread_pool_t(size_t soft_min_threads, size_t max_threads) - : soft_min_threads(soft_min_threads), max_threads(max_threads) {} - - /// Enqueue a new work item onto the thread pool. - /// The function \p func will execute in one of the pool's threads. - /// If \p cant_wait is set, disrespect the thread limit, because extant threads may - /// want to wait for new threads. - int perform(void_function_t &&func, bool cant_wait); - - private: - /// The worker loop for this thread. - void *run(); - - /// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by - /// reducing the active thread count. - /// This runs in the background thread. - maybe_t dequeue_work_or_commit_to_exit(); - - /// Trampoline function for pthread_spawn compatibility. - static void *run_trampoline(void *vpool); - - /// Attempt to spawn a new pthread. - bool spawn() const; -}; - -/// The thread pool for "iothreads" which are used to lift I/O off of the main thread. -/// These are used for completions, etc. -/// Leaked to avoid shutdown dtor registration (including tsan). -static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS)); - -/// A queue of "things to do on the main thread." -using main_thread_queue_t = std::vector; -static owning_lock s_main_thread_queue; - -/// \return the signaller for completions and main thread requests. -static fd_event_signaller_t &get_notify_signaller() { - // Leaked to avoid shutdown dtors. - static auto s_signaller = new fd_event_signaller_t(); - return *s_signaller; -} - -/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by -/// reducing the active thread count. -maybe_t thread_pool_t::dequeue_work_or_commit_to_exit() { - auto data = this->req_data.acquire(); - // If the queue is empty, check to see if we should wait. - // We should wait if our exiting would drop us below the soft min. - if (data->request_queue.empty() && data->total_threads == this->soft_min_threads && - IO_WAIT_FOR_WORK_DURATION_MS > 0) { - data->waiting_threads += 1; - this->queue_cond.wait_for(data.get_lock(), - std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS)); - data->waiting_threads -= 1; - } - - // Now that we've perhaps waited, see if there's something on the queue. - maybe_t result{}; - if (!data->request_queue.empty()) { - result = std::move(data->request_queue.front()); - data->request_queue.pop(); - } - // If we are returning none, then ensure we balance the thread count increment from when we were - // created. This has to be done here in this awkward place because we've already committed to - // exiting - we will never pick up more work. So we need to ensure we decrement the thread count - // while holding the lock as we are effectively exited. - if (!result) { - data->total_threads -= 1; - } +extern "C" const void *iothread_trampoline(const void *c) { + iothread_callback_t *callback = (iothread_callback_t *)c; + auto *result = (callback->callback)(callback->param); + delete callback; return result; } -static intptr_t this_thread() { return (intptr_t)pthread_self(); } - -void *thread_pool_t::run() { - while (auto req = dequeue_work_or_commit_to_exit()) { - FLOGF(iothread, L"pthread %p got work", this_thread()); - // Perform the work - req->handler(); - } - FLOGF(iothread, L"pthread %p exiting", this_thread()); - return nullptr; +extern "C" const void *iothread_trampoline2(const void *c, const void *p) { + iothread_callback_t *callback = (iothread_callback_t *)c; + auto *result = (callback->callback)(p); + delete callback; + return result; } - -void *thread_pool_t::run_trampoline(void *pool) { - assert(pool && "No thread pool given"); - return static_cast(pool)->run(); -} - -/// Spawn another thread. No lock is held when this is called. -bool thread_pool_t::spawn() const { - return make_detached_pthread(&run_trampoline, const_cast(this)); -} - -int thread_pool_t::perform(void_function_t &&func, bool cant_wait) { - assert(func && "Missing function"); - // Note we permit an empty completion. - struct work_request_t req(std::move(func)); - int local_thread_count = -1; - auto &pool = s_io_thread_pool; - bool spawn_new_thread = false; - bool wakeup_thread = false; - { - // Lock around a local region. - auto data = pool.req_data.acquire(); - data->request_queue.push(std::move(req)); - FLOGF(iothread, L"enqueuing work item (count is %lu)", data->request_queue.size()); - if (data->waiting_threads >= data->request_queue.size()) { - // There's enough waiting threads, wake one up. - wakeup_thread = true; - } else if (cant_wait || data->total_threads < pool.max_threads) { - // No threads are waiting but we can or must spawn a new thread. - data->total_threads += 1; - spawn_new_thread = true; - } - local_thread_count = data->total_threads; - } - - // Kick off the thread if we decided to do so. - if (wakeup_thread) { - FLOGF(iothread, L"notifying thread: %p", this_thread()); - pool.queue_cond.notify_one(); - } - if (spawn_new_thread) { - // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very - // unlikely that they are all on the verge of exiting, so one is likely to be ready to - // handle extant requests. So we can ignore failure with some confidence. - if (this->spawn()) { - FLOGF(iothread, L"pthread spawned"); - } else { - // We failed to spawn a thread; decrement the thread count. - pool.req_data.acquire()->total_threads -= 1; - } - } - return local_thread_count; -} -} // namespace - -void iothread_perform_impl(void_function_t &&func, bool cant_wait) { - ASSERT_IS_NOT_FORKED_CHILD(); - s_io_thread_pool.perform(std::move(func), cant_wait); -} - -int iothread_port() { return get_notify_signaller().read_fd(); } - -void iothread_service_main_with_timeout(uint64_t timeout_usec) { - if (is_fd_readable(iothread_port(), timeout_usec)) { - iothread_service_main(); - } -} - -/// At the moment, this function is only used in the test suite. -void iothread_drain_all() { - // Nasty polling via select(). - while (s_io_thread_pool.req_data.acquire()->total_threads > 0) { - iothread_service_main_with_timeout(1000); - } -} - -// Service the main thread queue, by invoking any functions enqueued for the main thread. -void iothread_service_main() { - ASSERT_IS_MAIN_THREAD(); - // Note the order here is important: we must consume events before handling requests, as posting - // uses the opposite order. - (void)get_notify_signaller().try_consume(); - - // Move the queue to a local variable. - // Note the s_main_thread_queue lock is not held after this. - main_thread_queue_t queue; - s_main_thread_queue.acquire()->swap(queue); - - // Perform each completion in order. - for (const void_function_t &func : queue) { - // ensure we don't invoke empty functions, that raises an exception - if (func) func(); - } -} - -bool make_detached_pthread(void *(*func)(void *), void *param) { - // The spawned thread inherits our signal mask. Temporarily block signals, spawn the thread, and - // then restore it. But we must not block SIGBUS, SIGFPE, SIGILL, or SIGSEGV; that's undefined - // (#7837). Conservatively don't try to mask SIGKILL or SIGSTOP either; that's ignored on Linux - // but maybe has an effect elsewhere. - sigset_t new_set, saved_set; - sigfillset(&new_set); - sigdelset(&new_set, SIGILL); // bad jump - sigdelset(&new_set, SIGFPE); // divide by zero - sigdelset(&new_set, SIGBUS); // unaligned memory access - sigdelset(&new_set, SIGSEGV); // bad memory access - sigdelset(&new_set, SIGSTOP); // unblockable - sigdelset(&new_set, SIGKILL); // unblockable - DIE_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set)); - - // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very - // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle - // extant requests. So we can ignore failure with some confidence. - pthread_t thread; - pthread_attr_t thread_attr; - DIE_ON_FAILURE(pthread_attr_init(&thread_attr)); - - int err = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); - if (err == 0) { - err = pthread_create(&thread, &thread_attr, func, param); - if (err == 0) { - FLOGF(iothread, "pthread %d spawned", thread); - } else { - perror("pthread_create"); - } - int err2 = pthread_attr_destroy(&thread_attr); - if (err2 != 0) { - perror("pthread_attr_destroy"); - err = err2; - } - } else { - perror("pthread_attr_setdetachstate"); - } - // Restore our sigmask. - DIE_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, nullptr)); - return err == 0; -} - -using void_func_t = std::function; - -static void *func_invoker(void *param) { - // Acquire a thread id for this thread. - (void)thread_id(); - auto vf = static_cast(param); - (*vf)(); - delete vf; - return nullptr; -} - -bool make_detached_pthread(void_func_t &&func) { - // Copy the function into a heap allocation. - auto vf = new void_func_t(std::move(func)); - if (make_detached_pthread(func_invoker, vf)) { - return true; - } - // Thread spawning failed, clean up our heap allocation. - delete vf; - return false; -} - -static uint64_t next_thread_id() { - // Note 0 is an invalid thread id. - // Note fetch_add is a CAS which returns the value *before* the modification. - static std::atomic s_last_thread_id{}; - uint64_t res = 1 + s_last_thread_id.fetch_add(1, std::memory_order_relaxed); - return res; -} - -uint64_t thread_id() { - static FISH_THREAD_LOCAL uint64_t tl_tid = next_thread_id(); - return tl_tid; -} - -// Debounce implementation note: we would like to enqueue at most one request, except if a thread -// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called -// "abandoned". This is implemented via a monotone uint64 counter, called a token. -// Every time we spawn a thread, increment the token. When the thread is completed, it compares its -// token to the active token; if they differ then this thread was abandoned. -struct debounce_t::impl_t { - // Synchronized data from debounce_t. - struct data_t { - // The (at most 1) next enqueued request, or none if none. - maybe_t next_req{}; - - // The token of the current non-abandoned thread, or 0 if no thread is running. - uint64_t active_token{0}; - - // The next token to use when spawning a thread. - uint64_t next_token{1}; - - // The start time of the most recently run thread spawn, or request (if any). - std::chrono::time_point start_time{}; - }; - owning_lock data{}; - - /// Run an iteration in the background, with the given thread token. - /// \return true if we handled a request, false if there were none. - bool run_next(uint64_t token); -}; - -bool debounce_t::impl_t::run_next(uint64_t token) { - assert(token > 0 && "Invalid token"); - // Note we are on a background thread. - maybe_t req; - { - auto d = data.acquire(); - if (d->next_req) { - // The value was dequeued, we are going to execute it. - req = d->next_req.acquire(); - d->start_time = std::chrono::steady_clock::now(); - } else { - // There is no request. If we are active, mark ourselves as no longer running. - if (token == d->active_token) { - d->active_token = 0; - } - return false; - } - } - - assert(req && req->handler && "Request should have value"); - req->handler(); - return true; -} - -uint64_t debounce_t::perform(std::function handler) { - uint64_t active_token{0}; - bool spawn{false}; - // Local lock. - { - auto d = impl_->data.acquire(); - d->next_req = work_request_t{std::move(handler)}; - // If we have a timeout, and our running thread has exceeded it, abandon that thread. - if (d->active_token && timeout_msec_ > 0 && - std::chrono::steady_clock::now() - d->start_time > - std::chrono::milliseconds(timeout_msec_)) { - // Abandon this thread by marking nothing as active. - d->active_token = 0; - } - if (!d->active_token) { - // We need to spawn a new thread. - // Mark the current time so that a new request won't immediately abandon us. - spawn = true; - d->active_token = d->next_token++; - d->start_time = std::chrono::steady_clock::now(); - } - active_token = d->active_token; - assert(active_token && "Something should be active"); - } - if (spawn) { - // Equip our background thread with a reference to impl, to keep it alive. - auto impl = impl_; - iothread_perform([=] { - while (impl->run_next(active_token)) - ; // pass - }); - } - return active_token; -} - -// static -void debounce_t::enqueue_main_thread_result(std::function func) { - s_main_thread_queue.acquire()->push_back(std::move(func)); - get_notify_signaller().post(); -} - -debounce_t::debounce_t(long timeout_msec) - : timeout_msec_(timeout_msec), impl_(std::make_shared()) {} -debounce_t::~debounce_t() = default; diff --git a/src/iothread.h b/src/iothread.h index c2797a6c4..2755db112 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -1,90 +1,123 @@ // Handles IO that may hang. #ifndef FISH_IOTHREAD_H #define FISH_IOTHREAD_H +#if INCLUDE_RUST_HEADERS -#include // for uint64_t +#include #include #include #include -/// \return the fd on which to listen for completion callbacks. -int iothread_port(); +#include "threads.rs.h" -/// Services iothread main thread completions and requests. -/// This does not block. -void iothread_service_main(); +struct iothread_callback_t { + std::function callback; + void *param; -// Services any main thread requests. Does not wait more than \p timeout_usec. -void iothread_service_main_with_timeout(uint64_t timeout_usec); + ~iothread_callback_t() { + if (param) { + free(param); + param = nullptr; + } + } +}; -/// Waits for all iothreads to terminate. -/// This is a hacky function only used in the test suite. -void iothread_drain_all(); - -// Internal implementation -void iothread_perform_impl(std::function &&, bool cant_wait = false); +extern "C" const void *iothread_trampoline(const void *callback); +extern "C" const void *iothread_trampoline2(const void *callback, const void *param); // iothread_perform invokes a handler on a background thread. inline void iothread_perform(std::function &&func) { - iothread_perform_impl(std::move(func)); + auto callback = new iothread_callback_t{std::bind([=] { + func(); + return nullptr; + }), + nullptr}; + + iothread_perform((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback); } /// Variant of iothread_perform that disrespects the thread limit. /// It does its best to spawn a new thread if all other threads are occupied. /// This is for cases where deferring a new thread might lead to deadlock. inline void iothread_perform_cantwait(std::function &&func) { - iothread_perform_impl(std::move(func), true); + auto callback = new iothread_callback_t{std::bind([=] { + func(); + return nullptr; + }), + nullptr}; + + iothread_perform_cantwait((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback); } -/// Creates a pthread, manipulating the signal mask so that the thread receives no signals. -/// The thread is detached. -/// The pthread runs \p func. -/// \returns true on success, false on failure. -bool make_detached_pthread(void *(*func)(void *), void *param); -bool make_detached_pthread(std::function &&func); +inline uint64_t debounce_perform(const debounce_t &debouncer, const std::function &func) { + auto callback = new iothread_callback_t{std::bind([=] { + func(); + return nullptr; + }), + nullptr}; -/// \returns a thread ID for this thread. -/// Thread IDs are never repeated. -uint64_t thread_id(); + return debouncer.perform((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback); +} -/// A Debounce is a simple class which executes one function in a background thread, -/// while enqueuing at most one more. New execution requests overwrite the enqueued one. -/// It has an optional timeout; if a handler does not finish within the timeout, then -/// a new thread is spawned. -class debounce_t { - public: - /// Enqueue \p handler to be performed on a background thread, and \p completion (if any) to be - /// performed on the main thread. If a function is already enqueued, this overwrites it; that - /// function will not execute. - /// If the function executes, then \p completion will be invoked on the main thread, with the - /// result of the handler. - /// The result is a token which is only of interest to the tests. - template - uint64_t perform(const Handler &handler, const Completion &completion) { - // Make a trampoline function which calls the handler, puts the result into a shared - // pointer, and then enqueues a completion. - auto trampoline = [=] { - using result_type_t = decltype(handler()); - auto result = std::make_shared(handler()); - enqueue_main_thread_result([=] { completion(std::move(*result)); }); - }; - return perform(std::move(trampoline)); - } +template +inline void debounce_perform_with_completion(const debounce_t &debouncer, std::function &&func, + std::function &&completion) { + auto callback1 = new iothread_callback_t{[=](const void *) { + auto *result = new R(func()); + return (void *)result; + }, + nullptr}; - /// One-argument form with no completion. - /// The result is a token which is only of interest to the tests. - uint64_t perform(std::function handler); + auto callback2 = new iothread_callback_t{ + ([=](const void *r) { + const R *result = (const R *)r; + completion(*result); + delete result; + return nullptr; + }), + nullptr, + }; - explicit debounce_t(long timeout_msec = 0); - ~debounce_t(); + debouncer.perform_with_completion( + (const uint8_t *)&iothread_trampoline, (const uint8_t *)callback1, + (const uint8_t *)&iothread_trampoline2, (const uint8_t *)callback2); +} - private: - /// Helper to enqueue a function to run on the main thread. - static void enqueue_main_thread_result(std::function func); +template +inline void debounce_perform_with_completion(const debounce_t &debouncer, std::function &&func, + std::function &&completion) { + auto callback1 = new iothread_callback_t{[=](const void *) { + auto *result = new R(func()); + return (void *)result; + }, + nullptr}; - const long timeout_msec_; - struct impl_t; - const std::shared_ptr impl_; -}; + auto callback2 = new iothread_callback_t{ + ([=](const void *r) { + const R *result = (const R *)r; + completion(*result); + delete result; + return nullptr; + }), + nullptr, + }; + + debouncer.perform_with_completion( + (const uint8_t *)&iothread_trampoline, (const uint8_t *)callback1, + (const uint8_t *)&iothread_trampoline2, (const uint8_t *)callback2); +} + +inline bool make_detached_pthread(const std::function &func) { + auto callback = new iothread_callback_t{ + [=](const void *) { + func(); + return nullptr; + }, + nullptr, + }; + + return make_detached_pthread((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback); +} #endif +#endif diff --git a/src/output.cpp b/src/output.cpp index 10aab08d8..da0ff8f99 100644 --- a/src/output.cpp +++ b/src/output.cpp @@ -29,6 +29,7 @@ #include "flog.h" #include "maybe.h" #include "output.h" +#include "threads.rs.h" #include "wcstringutil.h" #include "wutil.h" // IWYU pragma: keep diff --git a/src/parser.cpp b/src/parser.cpp index 9a1e9a873..207f5434a 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -30,6 +30,7 @@ #include "parse_execution.h" #include "proc.h" #include "signals.h" +#include "threads.rs.h" #include "wutil.h" // IWYU pragma: keep class io_chain_t; diff --git a/src/reader.cpp b/src/reader.cpp index a854b3c17..ce6fe49f5 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -177,19 +177,19 @@ static constexpr long kHighlightTimeoutForExecutionMs = 250; /// These are deliberately leaked to avoid shutdown dtor registration. static debounce_t &debounce_autosuggestions() { const long kAutosuggestTimeoutMs = 500; - static auto res = new debounce_t(kAutosuggestTimeoutMs); + static auto res = new_debounce_t(kAutosuggestTimeoutMs); return *res; } static debounce_t &debounce_highlighting() { const long kHighlightTimeoutMs = 500; - static auto res = new debounce_t(kHighlightTimeoutMs); + static auto res = new_debounce_t(kHighlightTimeoutMs); return *res; } static debounce_t &debounce_history_pager() { const long kHistoryPagerTimeoutMs = 500; - static auto res = new debounce_t(kHistoryPagerTimeoutMs); + static auto res = new_debounce_t(kHistoryPagerTimeoutMs); return *res; } @@ -1333,8 +1333,10 @@ void reader_data_t::fill_history_pager(bool new_search, history_search_direction } const wcstring &search_term = pager.search_field_line.text(); auto shared_this = this->shared_from_this(); - debounce_history_pager().perform( - [=]() { return history_pager_search(shared_this->history, direction, index, search_term); }, + std::function func = [=]() { + return history_pager_search(shared_this->history, direction, index, search_term); + }; + std::function completion = [=](const history_pager_result_t &result) { if (search_term != shared_this->pager.search_field_line.text()) return; // Stale request. @@ -1356,7 +1358,9 @@ void reader_data_t::fill_history_pager(bool new_search, history_search_direction shared_this->select_completion_in_direction(selection_motion_t::next, true); shared_this->super_highlight_me_plenty(); shared_this->layout_and_repaint(L"history-pager"); - }); + }; + auto &debouncer = debounce_history_pager(); + debounce_perform_with_completion(debouncer, std::move(func), std::move(completion)); } void reader_data_t::pager_selection_changed() { @@ -2107,11 +2111,14 @@ void reader_data_t::update_autosuggestion() { // Clear the autosuggestion and kick it off in the background. FLOG(reader_render, L"Autosuggesting"); autosuggestion.clear(); - auto performer = get_autosuggestion_performer(parser(), el.text(), el.position(), history); + std::function performer = + get_autosuggestion_performer(parser(), el.text(), el.position(), history); auto shared_this = this->shared_from_this(); - debounce_autosuggestions().perform(performer, [shared_this](autosuggestion_t result) { + std::function completion = [shared_this](autosuggestion_t result) { shared_this->autosuggest_completed(std::move(result)); - }); + }; + debounce_perform_with_completion(debounce_autosuggestions(), std::move(performer), + std::move(completion)); } // Accept any autosuggestion by replacing the command line with it. If full is true, take the whole @@ -2827,11 +2834,14 @@ void reader_data_t::super_highlight_me_plenty() { in_flight_highlight_request = el->text(); FLOG(reader_render, L"Highlighting"); - auto highlight_performer = get_highlight_performer(parser(), *el, true /* io_ok */); + std::function highlight_performer = + get_highlight_performer(parser(), *el, true /* io_ok */); auto shared_this = this->shared_from_this(); - debounce_highlighting().perform(highlight_performer, [shared_this](highlight_result_t result) { + std::function completion = [shared_this](highlight_result_t result) { shared_this->highlight_complete(std::move(result)); - }); + }; + debounce_perform_with_completion(debounce_highlighting(), std::move(highlight_performer), + std::move(completion)); } void reader_data_t::finish_highlighting_before_exec() {