From a6b565d5028a27eb86c66be6b5c2c2fb03fa2539 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Tue, 3 Mar 2020 01:24:05 -0800 Subject: [PATCH 1/3] Optimize parse_util_compute_indents Exploit the fact that most input strings will not contain newlines, in which case we do not have to parse anything. --- src/parse_util.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/parse_util.cpp b/src/parse_util.cpp index 794b981ea..9b90737e3 100644 --- a/src/parse_util.cpp +++ b/src/parse_util.cpp @@ -677,6 +677,12 @@ std::vector parse_util_compute_indents(const wcstring &src) { const size_t src_size = src.size(); std::vector indents(src_size, -1); + // Simple trick: if our source does not contain a newline, then all indents are 0. + if (src.find('\n') == wcstring::npos) { + std::fill(indents.begin(), indents.end(), 0); + return indents; + } + // Parse the string. We pass continue_after_error to produce a forest; the trailing indent of // the last node we visited becomes the input indent of the next. I.e. in the case of 'switch // foo ; cas', we get an invalid parse tree (since 'cas' is not valid) but we indent it as if it From bde2f2111d2183660186bab4c91d3082ddda3b7c Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 2 Mar 2020 22:57:41 -0800 Subject: [PATCH 2/3] Introduce debounce_t debounce_t will be used to limit thread creation from background highlighting and autosuggestion scenarios. This is a one-element queue backed by a single thread. New requests displace any existing queued request; this reflects the fact that autosuggestions and highlighting only care about the most recent result. A timeout allows for abandoning hung threads, which may happen if you attempt to e.g. access a dead hard-mounted NFS server. We don't want this to defeat autosuggestions and highlighting permanently, so allow spawning a new thread after the timeout (here 500 ms). --- src/fish_tests.cpp | 96 ++++++++++++++++++++++++++++++++++++++ src/iothread.cpp | 114 +++++++++++++++++++++++++++++++++++++++++---- src/iothread.h | 90 ++++++++++++++++++++++++----------- 3 files changed, 263 insertions(+), 37 deletions(-) diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 554126563..73ce0616b 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -881,6 +881,100 @@ static void test_pthread() { do_test(val == 5); } +static void test_debounce() { + say(L"Testing debounce"); + // Run 8 functions using a condition variable. + // Only the first and last should run. + debounce_t db; + constexpr size_t count = 8; + std::array handler_ran = {}; + std::array completion_ran = {}; + + bool ready_to_go = false; + std::mutex m; + std::condition_variable cv; + + // "Enqueue" all functions. Each one waits until ready_to_go. + for (size_t idx = 0; idx < count; idx++) { + do_test(handler_ran[idx] == false); + db.perform( + [&, idx] { + std::unique_lock lock(m); + cv.wait(lock, [&] { return ready_to_go; }); + handler_ran[idx] = true; + return idx; + }, + [&](size_t idx) { completion_ran[idx] = true; }); + } + + // We're ready to go. + { + std::unique_lock lock(m); + ready_to_go = true; + } + cv.notify_all(); + + // Wait until the last completion is done. + while (!completion_ran.back()) { + iothread_service_completion(); + } + iothread_drain_all(); + + // Each perform() call may displace an existing queued operation. + // Each operation waits until all are queued. + // Therefore we expect the last perform() to have run, and at most one more. + + do_test(handler_ran.back()); + do_test(completion_ran.back()); + + size_t total_ran = 0; + for (size_t idx = 0; idx < count; idx++) { + total_ran += (handler_ran[idx] ? 1 : 0); + do_test(handler_ran[idx] == completion_ran[idx]); + } + do_test(total_ran <= 2); +} + +static void test_debounce_timeout() { + using namespace std::chrono; + say(L"Testing debounce timeout"); + + // Verify that debounce doesn't wait forever. + // Use a shared_ptr so we don't have to join our threads. + const long timeout_ms = 50; + struct data_t { + debounce_t db{timeout_ms}; + bool exit_ok = false; + std::mutex m; + std::condition_variable cv; + relaxed_atomic_t running{0}; + }; + auto data = std::make_shared(); + + // Our background handler. Note this just blocks until exit_ok is set. + std::function handler = [data] { + data->running++; + std::unique_lock lock(data->m); + data->cv.wait(lock, [&] { return data->exit_ok; }); + }; + + // Spawn the handler twice. This should not modify the thread token. + uint64_t token1 = data->db.perform(handler); + uint64_t token2 = data->db.perform(handler); + do_test(token1 == token2); + + // Wait 75 msec, then enqueue something else; this should spawn a new thread. + std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms + timeout_ms / 2)); + do_test(data->running == 1); + uint64_t token3 = data->db.perform(handler); + do_test(token3 > token2); + + // Release all the threads. + std::unique_lock lock(data->m); + data->exit_ok = true; + data->cv.notify_all(); +} + static parser_test_error_bits_t detect_argument_errors(const wcstring &src) { parse_node_tree_t tree; if (!parse_tree_from_string(src, parse_flag_none, &tree, NULL, symbol_argument_list)) { @@ -5619,6 +5713,8 @@ int main(int argc, char **argv) { if (should_test_function("fd_monitor")) test_fd_monitor(); if (should_test_function("iothread")) test_iothread(); if (should_test_function("pthread")) test_pthread(); + if (should_test_function("debounce")) test_debounce(); + if (should_test_function("debounce")) test_debounce_timeout(); if (should_test_function("parser")) test_parser(); if (should_test_function("cancellation")) test_cancellation(); if (should_test_function("indents")) test_indents(); diff --git a/src/iothread.cpp b/src/iothread.cpp index 6f2adc51a..a4b55e793 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "common.h" #include "flog.h" @@ -132,7 +133,7 @@ struct thread_pool_t { /// These are used for completions, etc. static thread_pool_t s_io_thread_pool(1, IO_MAX_THREADS); -static owning_lock> s_result_queue; +static owning_lock> s_result_queue; // "Do on main thread" support. static std::mutex s_main_thread_performer_lock; // protects the main thread requests @@ -195,8 +196,11 @@ maybe_t thread_pool_t::dequeue_work_or_commit_to_exit() { return result; } -static void enqueue_thread_result(work_request_t req) { +static void enqueue_thread_result(void_function_t req) { s_result_queue.acquire()->push(std::move(req)); + const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; + int notify_fd = get_notify_pipes().write; + assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); } static void *this_thread() { return (void *)(intptr_t)pthread_self(); } @@ -212,10 +216,7 @@ void *thread_pool_t::run() { // Note we're using std::function's weirdo operator== here if (req->completion != nullptr) { // Enqueue the result, and tell the main thread about it. - enqueue_thread_result(req.acquire()); - const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; - int notify_fd = get_notify_pipes().write; - assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); + enqueue_thread_result(std::move(req->completion)); } } FLOGF(iothread, L"pthread %p exiting", this_thread()); @@ -392,16 +393,16 @@ static void iothread_service_main_thread_requests() { // Service the queue of results static void iothread_service_result_queue() { // Move the queue to a local variable. - std::queue result_queue; + std::queue result_queue; s_result_queue.acquire()->swap(result_queue); // Perform each completion in order while (!result_queue.empty()) { - work_request_t req(std::move(result_queue.front())); + void_function_t req(std::move(result_queue.front())); result_queue.pop(); // ensure we don't invoke empty functions, that raises an exception - if (req.completion != nullptr) { - req.completion(); + if (req != nullptr) { + req(); } } } @@ -493,3 +494,96 @@ uint64_t thread_id() { static thread_local uint64_t tl_tid = next_thread_id(); return tl_tid; } + +// Debounce implementation note: we would like to enqueue at most one request, except if a thread +// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called +// "abandoned". This is implemented via a monotone uint64 counter, called a token. +// Every time we spawn a thread, increment the token. When the thread is completed, it compares its +// token to the active token; if they differ then this thread was abandoned. +struct debounce_t::impl_t { + // Synchronized data from debounce_t. + struct data_t { + // The (at most 1) next enqueued request, or none if none. + maybe_t next_req{}; + + // The token of the current non-abandoned thread, or 0 if no thread is running. + uint64_t active_token{0}; + + // The next token to use when spawning a thread. + uint64_t next_token{1}; + + // The start time of the most recently run thread spawn, or request (if any). + std::chrono::time_point start_time{}; + }; + owning_lock data{}; + + /// Run an iteration in the background, with the given thread token. + /// \return true if we handled a request, false if there were none. + bool run_next(uint64_t token); +}; + +bool debounce_t::impl_t::run_next(uint64_t token) { + assert(token > 0 && "Invalid token"); + // Note we are on a background thread. + maybe_t req; + { + auto d = data.acquire(); + if (d->next_req) { + // The value was dequeued, we are going to execute it. + req = d->next_req.acquire(); + d->start_time = std::chrono::steady_clock::now(); + } else { + // There is no request. If we are active, mark ourselves as no longer running. + if (token == d->active_token) { + d->active_token = 0; + } + return false; + } + } + + assert(req && req->handler && "Request should have value"); + req->handler(); + if (req->completion) { + enqueue_thread_result(std::move(req->completion)); + } + return true; +} + +uint64_t debounce_t::perform_impl(std::function handler, std::function completion) { + uint64_t active_token{0}; + bool spawn{false}; + // Local lock. + { + auto d = impl_->data.acquire(); + d->next_req = work_request_t{std::move(handler), std::move(completion)}; + // If we have a timeout, and our running thread has exceeded it, abandon that thread. + if (d->active_token && timeout_msec_ > 0 && + std::chrono::steady_clock::now() - d->start_time > + std::chrono::milliseconds(timeout_msec_)) { + // Abandon this thread by marking nothing as active. + d->active_token = 0; + } + if (!d->active_token) { + // We need to spawn a new thread. + // Mark the current time so that a new request won't immediately abandon us. + spawn = true; + d->active_token = d->next_token++; + d->start_time = std::chrono::steady_clock::now(); + } + active_token = d->active_token; + assert(active_token && "Something should be active"); + } + if (spawn) { + // Equip our background thread with a reference to impl, to keep it alive. + auto impl = impl_; + iothread_perform([=] { + while (impl->run_next(active_token)) + ; // pass + }); + } + return active_token; +} + +debounce_t::debounce_t(long timeout_msec) + : timeout_msec_(timeout_msec), impl_(std::make_shared()) {} +debounce_t::~debounce_t() = default; diff --git a/src/iothread.h b/src/iothread.h index 9c4976345..a669bd379 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -6,8 +6,11 @@ #include // for uint64_t #include +#include #include +#include "maybe.h" + /// Runs a command on a thread. /// /// \param handler The function to execute on a background thread. Accepts an arbitrary context @@ -35,41 +38,43 @@ int iothread_drain_all(void); int iothread_perform_impl(std::function &&func, std::function &&completion, bool cant_wait = false); -// Template helpers -// This is the glue part of the handler-completion handoff -// In general we can just allocate an object, move the result of the handler into it, -// and then call the completion with that object. However if our type is void, -// this won't work (new void() fails!). So we have to use this template. -// The type T is the return type of HANDLER and the argument to COMPLETION -template -struct _iothread_trampoline { - template - static int perform(const HANDLER &handler, const COMPLETION &completion) { - T *result = new T(); // TODO: placement new? - return iothread_perform_impl([=]() { *result = handler(); }, - [=]() { - completion(std::move(*result)); - delete result; - }); +// This is the glue part of the handler-completion handoff. +// Given a Handler and Completion, where the return value of Handler should be passed to Completion, +// this generates new void->void functions that wraps that behavior. The type T is the return type +// of Handler and the argument to Completion +template ::type> +struct iothread_trampoline_t { + iothread_trampoline_t(const Handler &hand, const Completion &comp) { + auto result = std::make_shared>(); + this->handler = [=] { *result = hand(); }; + this->completion = [=] { comp(result->acquire()); }; } + + // The generated handler and completion functions. + std::function handler; + std::function completion; }; -// Void specialization -template <> -struct _iothread_trampoline { - template - static int perform(const HANDLER &handler, const COMPLETION &completion) { - return iothread_perform_impl(handler, completion); - } +// Void specialization. +template +struct iothread_trampoline_t { + iothread_trampoline_t(std::function hand, std::function comp) + : handler(std::move(hand)), completion(std::move(comp)) {} + + // The generated handler and completion functions. + std::function handler; + std::function completion; }; // iothread_perform invokes a handler on a background thread, and then a completion function // on the main thread. The value returned from the handler is passed to the completion. -// In other words, this is like COMPLETION(HANDLER()) except the handler part is invoked +// In other words, this is like Completion(Handler()) except the handler part is invoked // on a background thread. -template -int iothread_perform(const HANDLER &handler, const COMPLETION &completion) { - return _iothread_trampoline::perform(handler, completion); +template +int iothread_perform(const Handler &handler, const Completion &completion) { + iothread_trampoline_t tramp(handler, completion); + return iothread_perform_impl(std::move(tramp.handler), std::move(tramp.completion)); } // variant of iothread_perform without a completion handler @@ -98,4 +103,35 @@ bool make_detached_pthread(std::function &&func); /// Thread IDs are never repeated. uint64_t thread_id(); +/// A Debounce is a simple class which executes one function in a background thread, +/// while enqueuing at most one more. New execution requests overwrite the enqueued one. +/// It has an optional timeout; if a handler does not finish within the timeout, then +/// a new thread is spawned. +class debounce_t { + public: + /// Enqueue \p handler to be performed on a background thread, and \p completion (if any) to be + /// performed on the main thread. If a function is already enqueued, this overwrites it; that + /// function will not execute. + /// This returns the active thread token, which is only of interest to tests. + template + uint64_t perform(Handler handler, Completion completion) { + iothread_trampoline_t tramp(handler, completion); + return perform_impl(std::move(tramp.handler), std::move(tramp.completion)); + } + + /// One-argument form with no completion. + uint64_t perform(std::function func) { return perform_impl(std::move(func), {}); } + + explicit debounce_t(long timeout_msec = 0); + ~debounce_t(); + + private: + /// Implementation of perform(). + uint64_t perform_impl(std::function handler, std::function completion); + + const long timeout_msec_; + struct impl_t; + const std::shared_ptr impl_; +}; + #endif From e334becefbf221db19e8e4e4c550da036db3b61b Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Tue, 3 Mar 2020 00:13:33 -0800 Subject: [PATCH 3/3] Adopt debounce for highlighting and autosuggestions This prevents a thundering herd of threads for certain interactive scenarios. --- src/reader.cpp | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/reader.cpp b/src/reader.cpp index 10fee7c94..cfb45b61f 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -144,6 +144,20 @@ operation_context_t get_bg_context(const std::shared_ptr &env, return operation_context_t{nullptr, *env, std::move(cancel_checker)}; } +/// Get the debouncer for autosuggestions and background highlighting. +/// These are deliberately leaked to avoid shutdown dtor registration. +static debounce_t &debounce_autosuggestions() { + const long kAutosuggetTimeoutMs = 500; + static debounce_t *res = new debounce_t(kAutosuggetTimeoutMs); + return *res; +} + +static debounce_t &debounce_highlighting() { + const long kHighlightTimeoutMs = 500; + static debounce_t *res = new debounce_t(kHighlightTimeoutMs); + return *res; +} + bool edit_t::operator==(const edit_t &other) const { return cursor_position_before_edit == other.cursor_position_before_edit && offset == other.offset && length == other.length && old == other.old && @@ -1482,9 +1496,10 @@ void reader_data_t::update_autosuggestion() { auto performer = get_autosuggestion_performer(parser(), el->text(), el->position(), history); auto shared_this = this->shared_from_this(); - iothread_perform(performer, [shared_this](autosuggestion_result_t result) { - shared_this->autosuggest_completed(std::move(result)); - }); + debounce_autosuggestions().perform( + performer, [shared_this](autosuggestion_result_t result) { + shared_this->autosuggest_completed(std::move(result)); + }); } } @@ -2226,9 +2241,10 @@ void reader_data_t::super_highlight_me_plenty(int match_highlight_pos_adjust, bo } else { // Highlighting including I/O proceeds in the background. auto shared_this = this->shared_from_this(); - iothread_perform(highlight_performer, [shared_this](highlight_result_t result) { - shared_this->highlight_complete(std::move(result)); - }); + debounce_highlighting().perform(highlight_performer, + [shared_this](highlight_result_t result) { + shared_this->highlight_complete(std::move(result)); + }); } highlight_search();