Merge branch 'debounce'

This adds 'debounce' support to highlighting and autosuggestions, so
that we do not spawn excessive threads.
This commit is contained in:
ridiculousfish 2020-03-06 18:13:57 -08:00
commit 6f22aadaf7
5 changed files with 291 additions and 43 deletions

View file

@ -881,6 +881,100 @@ static void test_pthread() {
do_test(val == 5); do_test(val == 5);
} }
static void test_debounce() {
say(L"Testing debounce");
// Run 8 functions using a condition variable.
// Only the first and last should run.
debounce_t db;
constexpr size_t count = 8;
std::array<bool, count> handler_ran = {};
std::array<bool, count> completion_ran = {};
bool ready_to_go = false;
std::mutex m;
std::condition_variable cv;
// "Enqueue" all functions. Each one waits until ready_to_go.
for (size_t idx = 0; idx < count; idx++) {
do_test(handler_ran[idx] == false);
db.perform(
[&, idx] {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&] { return ready_to_go; });
handler_ran[idx] = true;
return idx;
},
[&](size_t idx) { completion_ran[idx] = true; });
}
// We're ready to go.
{
std::unique_lock<std::mutex> lock(m);
ready_to_go = true;
}
cv.notify_all();
// Wait until the last completion is done.
while (!completion_ran.back()) {
iothread_service_completion();
}
iothread_drain_all();
// Each perform() call may displace an existing queued operation.
// Each operation waits until all are queued.
// Therefore we expect the last perform() to have run, and at most one more.
do_test(handler_ran.back());
do_test(completion_ran.back());
size_t total_ran = 0;
for (size_t idx = 0; idx < count; idx++) {
total_ran += (handler_ran[idx] ? 1 : 0);
do_test(handler_ran[idx] == completion_ran[idx]);
}
do_test(total_ran <= 2);
}
static void test_debounce_timeout() {
using namespace std::chrono;
say(L"Testing debounce timeout");
// Verify that debounce doesn't wait forever.
// Use a shared_ptr so we don't have to join our threads.
const long timeout_ms = 50;
struct data_t {
debounce_t db{timeout_ms};
bool exit_ok = false;
std::mutex m;
std::condition_variable cv;
relaxed_atomic_t<uint32_t> running{0};
};
auto data = std::make_shared<data_t>();
// Our background handler. Note this just blocks until exit_ok is set.
std::function<void()> handler = [data] {
data->running++;
std::unique_lock<std::mutex> lock(data->m);
data->cv.wait(lock, [&] { return data->exit_ok; });
};
// Spawn the handler twice. This should not modify the thread token.
uint64_t token1 = data->db.perform(handler);
uint64_t token2 = data->db.perform(handler);
do_test(token1 == token2);
// Wait 75 msec, then enqueue something else; this should spawn a new thread.
std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms + timeout_ms / 2));
do_test(data->running == 1);
uint64_t token3 = data->db.perform(handler);
do_test(token3 > token2);
// Release all the threads.
std::unique_lock<std::mutex> lock(data->m);
data->exit_ok = true;
data->cv.notify_all();
}
static parser_test_error_bits_t detect_argument_errors(const wcstring &src) { static parser_test_error_bits_t detect_argument_errors(const wcstring &src) {
parse_node_tree_t tree; parse_node_tree_t tree;
if (!parse_tree_from_string(src, parse_flag_none, &tree, NULL, symbol_argument_list)) { if (!parse_tree_from_string(src, parse_flag_none, &tree, NULL, symbol_argument_list)) {
@ -5619,6 +5713,8 @@ int main(int argc, char **argv) {
if (should_test_function("fd_monitor")) test_fd_monitor(); if (should_test_function("fd_monitor")) test_fd_monitor();
if (should_test_function("iothread")) test_iothread(); if (should_test_function("iothread")) test_iothread();
if (should_test_function("pthread")) test_pthread(); if (should_test_function("pthread")) test_pthread();
if (should_test_function("debounce")) test_debounce();
if (should_test_function("debounce")) test_debounce_timeout();
if (should_test_function("parser")) test_parser(); if (should_test_function("parser")) test_parser();
if (should_test_function("cancellation")) test_cancellation(); if (should_test_function("cancellation")) test_cancellation();
if (should_test_function("indents")) test_indents(); if (should_test_function("indents")) test_indents();

View file

@ -16,6 +16,7 @@
#include <condition_variable> #include <condition_variable>
#include <functional> #include <functional>
#include <queue> #include <queue>
#include <thread>
#include "common.h" #include "common.h"
#include "flog.h" #include "flog.h"
@ -132,7 +133,7 @@ struct thread_pool_t {
/// These are used for completions, etc. /// These are used for completions, etc.
static thread_pool_t s_io_thread_pool(1, IO_MAX_THREADS); static thread_pool_t s_io_thread_pool(1, IO_MAX_THREADS);
static owning_lock<std::queue<work_request_t>> s_result_queue; static owning_lock<std::queue<void_function_t>> s_result_queue;
// "Do on main thread" support. // "Do on main thread" support.
static std::mutex s_main_thread_performer_lock; // protects the main thread requests static std::mutex s_main_thread_performer_lock; // protects the main thread requests
@ -195,8 +196,11 @@ maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
return result; return result;
} }
static void enqueue_thread_result(work_request_t req) { static void enqueue_thread_result(void_function_t req) {
s_result_queue.acquire()->push(std::move(req)); s_result_queue.acquire()->push(std::move(req));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
} }
static void *this_thread() { return (void *)(intptr_t)pthread_self(); } static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
@ -212,10 +216,7 @@ void *thread_pool_t::run() {
// Note we're using std::function's weirdo operator== here // Note we're using std::function's weirdo operator== here
if (req->completion != nullptr) { if (req->completion != nullptr) {
// Enqueue the result, and tell the main thread about it. // Enqueue the result, and tell the main thread about it.
enqueue_thread_result(req.acquire()); enqueue_thread_result(std::move(req->completion));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
} }
} }
FLOGF(iothread, L"pthread %p exiting", this_thread()); FLOGF(iothread, L"pthread %p exiting", this_thread());
@ -392,16 +393,16 @@ static void iothread_service_main_thread_requests() {
// Service the queue of results // Service the queue of results
static void iothread_service_result_queue() { static void iothread_service_result_queue() {
// Move the queue to a local variable. // Move the queue to a local variable.
std::queue<work_request_t> result_queue; std::queue<void_function_t> result_queue;
s_result_queue.acquire()->swap(result_queue); s_result_queue.acquire()->swap(result_queue);
// Perform each completion in order // Perform each completion in order
while (!result_queue.empty()) { while (!result_queue.empty()) {
work_request_t req(std::move(result_queue.front())); void_function_t req(std::move(result_queue.front()));
result_queue.pop(); result_queue.pop();
// ensure we don't invoke empty functions, that raises an exception // ensure we don't invoke empty functions, that raises an exception
if (req.completion != nullptr) { if (req != nullptr) {
req.completion(); req();
} }
} }
} }
@ -493,3 +494,96 @@ uint64_t thread_id() {
static thread_local uint64_t tl_tid = next_thread_id(); static thread_local uint64_t tl_tid = next_thread_id();
return tl_tid; return tl_tid;
} }
// Debounce implementation note: we would like to enqueue at most one request, except if a thread
// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called
// "abandoned". This is implemented via a monotone uint64 counter, called a token.
// Every time we spawn a thread, increment the token. When the thread is completed, it compares its
// token to the active token; if they differ then this thread was abandoned.
struct debounce_t::impl_t {
// Synchronized data from debounce_t.
struct data_t {
// The (at most 1) next enqueued request, or none if none.
maybe_t<work_request_t> next_req{};
// The token of the current non-abandoned thread, or 0 if no thread is running.
uint64_t active_token{0};
// The next token to use when spawning a thread.
uint64_t next_token{1};
// The start time of the most recently run thread spawn, or request (if any).
std::chrono::time_point<std::chrono::steady_clock> start_time{};
};
owning_lock<data_t> data{};
/// Run an iteration in the background, with the given thread token.
/// \return true if we handled a request, false if there were none.
bool run_next(uint64_t token);
};
bool debounce_t::impl_t::run_next(uint64_t token) {
assert(token > 0 && "Invalid token");
// Note we are on a background thread.
maybe_t<work_request_t> req;
{
auto d = data.acquire();
if (d->next_req) {
// The value was dequeued, we are going to execute it.
req = d->next_req.acquire();
d->start_time = std::chrono::steady_clock::now();
} else {
// There is no request. If we are active, mark ourselves as no longer running.
if (token == d->active_token) {
d->active_token = 0;
}
return false;
}
}
assert(req && req->handler && "Request should have value");
req->handler();
if (req->completion) {
enqueue_thread_result(std::move(req->completion));
}
return true;
}
uint64_t debounce_t::perform_impl(std::function<void()> handler, std::function<void()> completion) {
uint64_t active_token{0};
bool spawn{false};
// Local lock.
{
auto d = impl_->data.acquire();
d->next_req = work_request_t{std::move(handler), std::move(completion)};
// If we have a timeout, and our running thread has exceeded it, abandon that thread.
if (d->active_token && timeout_msec_ > 0 &&
std::chrono::steady_clock::now() - d->start_time >
std::chrono::milliseconds(timeout_msec_)) {
// Abandon this thread by marking nothing as active.
d->active_token = 0;
}
if (!d->active_token) {
// We need to spawn a new thread.
// Mark the current time so that a new request won't immediately abandon us.
spawn = true;
d->active_token = d->next_token++;
d->start_time = std::chrono::steady_clock::now();
}
active_token = d->active_token;
assert(active_token && "Something should be active");
}
if (spawn) {
// Equip our background thread with a reference to impl, to keep it alive.
auto impl = impl_;
iothread_perform([=] {
while (impl->run_next(active_token))
; // pass
});
}
return active_token;
}
debounce_t::debounce_t(long timeout_msec)
: timeout_msec_(timeout_msec), impl_(std::make_shared<impl_t>()) {}
debounce_t::~debounce_t() = default;

View file

@ -6,8 +6,11 @@
#include <cstdint> // for uint64_t #include <cstdint> // for uint64_t
#include <functional> #include <functional>
#include <memory>
#include <type_traits> #include <type_traits>
#include "maybe.h"
/// Runs a command on a thread. /// Runs a command on a thread.
/// ///
/// \param handler The function to execute on a background thread. Accepts an arbitrary context /// \param handler The function to execute on a background thread. Accepts an arbitrary context
@ -35,41 +38,43 @@ int iothread_drain_all(void);
int iothread_perform_impl(std::function<void(void)> &&func, std::function<void(void)> &&completion, int iothread_perform_impl(std::function<void(void)> &&func, std::function<void(void)> &&completion,
bool cant_wait = false); bool cant_wait = false);
// Template helpers // This is the glue part of the handler-completion handoff.
// This is the glue part of the handler-completion handoff // Given a Handler and Completion, where the return value of Handler should be passed to Completion,
// In general we can just allocate an object, move the result of the handler into it, // this generates new void->void functions that wraps that behavior. The type T is the return type
// and then call the completion with that object. However if our type is void, // of Handler and the argument to Completion
// this won't work (new void() fails!). So we have to use this template. template <typename Handler, typename Completion,
// The type T is the return type of HANDLER and the argument to COMPLETION typename Result = typename std::result_of<Handler()>::type>
template <typename T> struct iothread_trampoline_t {
struct _iothread_trampoline { iothread_trampoline_t(const Handler &hand, const Completion &comp) {
template <typename HANDLER, typename COMPLETION> auto result = std::make_shared<maybe_t<Result>>();
static int perform(const HANDLER &handler, const COMPLETION &completion) { this->handler = [=] { *result = hand(); };
T *result = new T(); // TODO: placement new? this->completion = [=] { comp(result->acquire()); };
return iothread_perform_impl([=]() { *result = handler(); },
[=]() {
completion(std::move(*result));
delete result;
});
} }
// The generated handler and completion functions.
std::function<void()> handler;
std::function<void()> completion;
}; };
// Void specialization // Void specialization.
template <> template <typename Handler, typename Completion>
struct _iothread_trampoline<void> { struct iothread_trampoline_t<Handler, Completion, void> {
template <typename HANDLER, typename COMPLETION> iothread_trampoline_t(std::function<void()> hand, std::function<void()> comp)
static int perform(const HANDLER &handler, const COMPLETION &completion) { : handler(std::move(hand)), completion(std::move(comp)) {}
return iothread_perform_impl(handler, completion);
} // The generated handler and completion functions.
std::function<void()> handler;
std::function<void()> completion;
}; };
// iothread_perform invokes a handler on a background thread, and then a completion function // iothread_perform invokes a handler on a background thread, and then a completion function
// on the main thread. The value returned from the handler is passed to the completion. // on the main thread. The value returned from the handler is passed to the completion.
// In other words, this is like COMPLETION(HANDLER()) except the handler part is invoked // In other words, this is like Completion(Handler()) except the handler part is invoked
// on a background thread. // on a background thread.
template <typename HANDLER, typename COMPLETION> template <typename Handler, typename Completion>
int iothread_perform(const HANDLER &handler, const COMPLETION &completion) { int iothread_perform(const Handler &handler, const Completion &completion) {
return _iothread_trampoline<decltype(handler())>::perform(handler, completion); iothread_trampoline_t<Handler, Completion> tramp(handler, completion);
return iothread_perform_impl(std::move(tramp.handler), std::move(tramp.completion));
} }
// variant of iothread_perform without a completion handler // variant of iothread_perform without a completion handler
@ -98,4 +103,35 @@ bool make_detached_pthread(std::function<void(void)> &&func);
/// Thread IDs are never repeated. /// Thread IDs are never repeated.
uint64_t thread_id(); uint64_t thread_id();
/// A Debounce is a simple class which executes one function in a background thread,
/// while enqueuing at most one more. New execution requests overwrite the enqueued one.
/// It has an optional timeout; if a handler does not finish within the timeout, then
/// a new thread is spawned.
class debounce_t {
public:
/// Enqueue \p handler to be performed on a background thread, and \p completion (if any) to be
/// performed on the main thread. If a function is already enqueued, this overwrites it; that
/// function will not execute.
/// This returns the active thread token, which is only of interest to tests.
template <typename Handler, typename Completion>
uint64_t perform(Handler handler, Completion completion) {
iothread_trampoline_t<Handler, Completion> tramp(handler, completion);
return perform_impl(std::move(tramp.handler), std::move(tramp.completion));
}
/// One-argument form with no completion.
uint64_t perform(std::function<void()> func) { return perform_impl(std::move(func), {}); }
explicit debounce_t(long timeout_msec = 0);
~debounce_t();
private:
/// Implementation of perform().
uint64_t perform_impl(std::function<void()> handler, std::function<void()> completion);
const long timeout_msec_;
struct impl_t;
const std::shared_ptr<impl_t> impl_;
};
#endif #endif

View file

@ -677,6 +677,12 @@ std::vector<int> parse_util_compute_indents(const wcstring &src) {
const size_t src_size = src.size(); const size_t src_size = src.size();
std::vector<int> indents(src_size, -1); std::vector<int> indents(src_size, -1);
// Simple trick: if our source does not contain a newline, then all indents are 0.
if (src.find('\n') == wcstring::npos) {
std::fill(indents.begin(), indents.end(), 0);
return indents;
}
// Parse the string. We pass continue_after_error to produce a forest; the trailing indent of // Parse the string. We pass continue_after_error to produce a forest; the trailing indent of
// the last node we visited becomes the input indent of the next. I.e. in the case of 'switch // the last node we visited becomes the input indent of the next. I.e. in the case of 'switch
// foo ; cas', we get an invalid parse tree (since 'cas' is not valid) but we indent it as if it // foo ; cas', we get an invalid parse tree (since 'cas' is not valid) but we indent it as if it

View file

@ -144,6 +144,20 @@ operation_context_t get_bg_context(const std::shared_ptr<environment_t> &env,
return operation_context_t{nullptr, *env, std::move(cancel_checker)}; return operation_context_t{nullptr, *env, std::move(cancel_checker)};
} }
/// Get the debouncer for autosuggestions and background highlighting.
/// These are deliberately leaked to avoid shutdown dtor registration.
static debounce_t &debounce_autosuggestions() {
const long kAutosuggetTimeoutMs = 500;
static debounce_t *res = new debounce_t(kAutosuggetTimeoutMs);
return *res;
}
static debounce_t &debounce_highlighting() {
const long kHighlightTimeoutMs = 500;
static debounce_t *res = new debounce_t(kHighlightTimeoutMs);
return *res;
}
bool edit_t::operator==(const edit_t &other) const { bool edit_t::operator==(const edit_t &other) const {
return cursor_position_before_edit == other.cursor_position_before_edit && return cursor_position_before_edit == other.cursor_position_before_edit &&
offset == other.offset && length == other.length && old == other.old && offset == other.offset && length == other.length && old == other.old &&
@ -1482,7 +1496,8 @@ void reader_data_t::update_autosuggestion() {
auto performer = auto performer =
get_autosuggestion_performer(parser(), el->text(), el->position(), history); get_autosuggestion_performer(parser(), el->text(), el->position(), history);
auto shared_this = this->shared_from_this(); auto shared_this = this->shared_from_this();
iothread_perform(performer, [shared_this](autosuggestion_result_t result) { debounce_autosuggestions().perform(
performer, [shared_this](autosuggestion_result_t result) {
shared_this->autosuggest_completed(std::move(result)); shared_this->autosuggest_completed(std::move(result));
}); });
} }
@ -2226,7 +2241,8 @@ void reader_data_t::super_highlight_me_plenty(int match_highlight_pos_adjust, bo
} else { } else {
// Highlighting including I/O proceeds in the background. // Highlighting including I/O proceeds in the background.
auto shared_this = this->shared_from_this(); auto shared_this = this->shared_from_this();
iothread_perform(highlight_performer, [shared_this](highlight_result_t result) { debounce_highlighting().perform(highlight_performer,
[shared_this](highlight_result_t result) {
shared_this->highlight_complete(std::move(result)); shared_this->highlight_complete(std::move(result));
}); });
} }