diff --git a/env_universal.cpp b/env_universal.cpp index 6387677d4..9efb05800 100644 --- a/env_universal.cpp +++ b/env_universal.cpp @@ -455,6 +455,7 @@ void env_universal_set(const wcstring &name, const wcstring &value, bool exportv if (! synchronizes_via_fishd() || is_dead()) { env_universal_common_set(name.c_str(), value.c_str(), exportv); + env_universal_barrier(); } else { diff --git a/env_universal_common.cpp b/env_universal_common.cpp index 6dfa01634..003a76160 100644 --- a/env_universal_common.cpp +++ b/env_universal_common.cpp @@ -265,7 +265,11 @@ void env_universal_common_set(const wchar_t *key, const wchar_t *val, bool expor void env_universal_common_sync() { callback_data_list_t callbacks; - default_universal_vars().sync(&callbacks); + bool changed = default_universal_vars().sync(&callbacks); + if (changed) + { + universal_notifier_t::default_notifier().post_notification(); + } post_callbacks(callbacks); } @@ -572,13 +576,16 @@ void env_universal_t::set_internal(const wcstring &key, const wcstring &val, boo } var_entry_t *entry = &vars[key]; - entry->val = val; - entry->exportv = exportv; - - /* If we are overwriting, then this is now modified */ - if (overwrite) + if (entry->exportv != exportv || entry->val != val) { - this->modified.insert(key); + entry->val = val; + entry->exportv = exportv; + + /* If we are overwriting, then this is now modified */ + if (overwrite) + { + this->modified.insert(key); + } } } @@ -596,8 +603,8 @@ void env_universal_t::remove_internal(const wcstring &key, bool overwrite) /* This value has been modified and we're not overwriting it. Skip it. */ return; } - this->vars.erase(key); - if (overwrite) + size_t erased = this->vars.erase(key); + if (erased > 0 && overwrite) { this->modified.insert(key); } @@ -648,6 +655,26 @@ void env_universal_t::enqueue_all(connection_t *c) const enqueue_all_internal(c); } +void env_universal_t::erase_unmodified_values() +{ + /* Delete all non-modified keys. */ + var_table_t::iterator iter = vars.begin(); + while (iter != vars.end()) + { + const wcstring &key = iter->first; + if (modified.find(key) == modified.end()) + { + // Unmodified key. Erase the old value. + vars.erase(iter++); + } + else + { + // Modified key, retain the value. + ++iter; + } + } +} + void env_universal_t::load_from_fd(int fd, callback_data_list_t *callbacks) { ASSERT_IS_LOCKED(lock); @@ -656,6 +683,8 @@ void env_universal_t::load_from_fd(int fd, callback_data_list_t *callbacks) const file_id_t current_file = file_id_for_fd(fd); if (current_file != last_read_file) { + /* Unmodified values are sourced from the file. Since we are about to read a different file, erase them */ + this->erase_unmodified_values(); connection_t c(fd); /* Read from the file. Do not destroy the connection; the caller is responsible for closing the fd. */ this->read_message_internal(&c, callbacks); @@ -666,6 +695,13 @@ void env_universal_t::load_from_fd(int fd, callback_data_list_t *callbacks) bool env_universal_t::load_from_path(const wcstring &path, callback_data_list_t *callbacks) { ASSERT_IS_LOCKED(lock); + + /* Check to see if the file is unchanged. We do this again in load_from_fd, but this avoids opening the file unnecessarily. */ + if (last_read_file != kInvalidFileID && file_id_for_path(path) == last_read_file) + { + return true; + } + /* OK to not use CLO_EXEC here because fishd is single threaded */ bool result = false; int fd = wopen_cloexec(path, O_RDONLY); @@ -895,6 +931,7 @@ bool env_universal_t::open_and_acquire_lock(const wcstring &path, int *out_fd) return result_fd >= 0; } +/* Returns true if modified variables were written, false if not. (There may still be variable changes due to other processes on a false return). */ bool env_universal_t::sync(callback_data_list_t *callbacks) { scoped_lock locker(lock); @@ -924,9 +961,17 @@ bool env_universal_t::sync(callback_data_list_t *callbacks) /* If we have no changes, just load */ if (modified.empty()) { - return this->load_from_path(vars_path, callbacks); + this->load_from_path(vars_path, callbacks); + return false; } +#if 0 + for (std::set::iterator iter = modified.begin(); iter != modified.end(); ++iter) + { + fprintf(stderr, "Modified %ls\n", iter->c_str()); + } +#endif + const wcstring directory = wdirname(vars_path); bool success = false; int vars_fd = -1; @@ -1304,6 +1349,7 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t #define SHMEM_VERSION_CURRENT 1000 private: + long long last_change_time; uint32_t last_seed; volatile universal_notifier_shmem_t *region; @@ -1396,7 +1442,7 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t } } - universal_notifier_shmem_poller_t() : last_seed(0), region(NULL) + universal_notifier_shmem_poller_t() : last_change_time(0), last_seed(0), region(NULL) { open_shmem(); } @@ -1429,11 +1475,29 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t { result = true; last_seed = seed; + last_change_time = get_time(); } } return result; } + unsigned long usec_delay_between_polls() const + { + // If it's been less than five seconds since the last change, we poll quickly + // Otherwise we poll more slowly + // Note that a poll is a very cheap shmem read. The bad part about making this high + // is the process scheduling/wakeups it produces + unsigned long usec_per_sec = 1000000; + if (get_time() - last_change_time < 5LL * usec_per_sec) + { + return usec_per_sec / 25; //10 times a second + } + else + { + return usec_per_sec / 3; //3 times a second + } + } + }; universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy() @@ -1490,4 +1554,10 @@ bool universal_notifier_t::poll() bool universal_notifier_t::needs_polling() const { return false; -} \ No newline at end of file +} + +unsigned long universal_notifier_t::usec_delay_between_polls() const +{ + return 0; +} + diff --git a/env_universal_common.h b/env_universal_common.h index b0ba48764..ad5ed5157 100644 --- a/env_universal_common.h +++ b/env_universal_common.h @@ -222,6 +222,7 @@ class env_universal_t bool tried_renaming; bool load_from_path(const wcstring &path, callback_data_list_t *callbacks); void load_from_fd(int fd, callback_data_list_t *callbacks); + void erase_unmodified_values(); void parse_message_internal(wchar_t *msg, connection_t *src, callback_data_list_t *callbacks); @@ -265,7 +266,7 @@ public: /** Loads variables at the correct path */ bool load(); - /** Reads and writes variables at the correct path */ + /** Reads and writes variables at the correct path. Returns true if modified variables were written. */ bool sync(callback_data_list_t *callbacks); /* Internal use */ @@ -289,7 +290,6 @@ public: /* No copying */ universal_notifier_t &operator=(const universal_notifier_t &); universal_notifier_t(const universal_notifier_t &x); - static notifier_strategy_t resolve_default_strategy(); public: @@ -299,7 +299,7 @@ public: /* Factory constructor. Free with delete */ static universal_notifier_t *new_notifier_for_strategy(notifier_strategy_t strat); - /* Default instance. Other instances are possible for testing */ + /* Default instance. Other instances are possible for testing. */ static universal_notifier_t &default_notifier(); /* Returns the fd from which to watch for events, or -1 if none */ @@ -313,6 +313,9 @@ public: /* Triggers a notification */ virtual void post_notification(); + + /* Recommended delay between polls. A value of 0 means no polling required (so no timeout) */ + virtual unsigned long usec_delay_between_polls() const; }; std::string get_machine_identifier(); diff --git a/fish_tests.cpp b/fish_tests.cpp index 4ab9ff199..eed6abd3e 100644 --- a/fish_tests.cpp +++ b/fish_tests.cpp @@ -2161,10 +2161,11 @@ static void test_input() } #define UVARS_PER_THREAD 8 +#define UVARS_TEST_PATH L"/tmp/fish_uvars_test/varsfile.txt" static int test_universal_helper(int *x) { - env_universal_t uvars(L"/tmp/fish_uvars_test/varsfile.txt"); + env_universal_t uvars(UVARS_TEST_PATH); for (int j=0; j < UVARS_PER_THREAD; j++) { const wcstring key = format_string(L"key_%d_%d", *x, j); @@ -2176,8 +2177,17 @@ static int test_universal_helper(int *x) err(L"Failed to sync universal variables"); } fputc('.', stderr); - fflush(stderr); } + + /* Last step is to delete the first key */ + uvars.remove(format_string(L"key_%d_%d", *x, 0)); + bool synced = uvars.sync(NULL); + if (! synced) + { + err(L"Failed to sync universal variables"); + } + fputc('.', stderr); + return 0; } @@ -2193,7 +2203,7 @@ static void test_universal() } iothread_drain_all(); - env_universal_t uvars(L"/tmp/fish_uvars_test/varsfile.txt"); + env_universal_t uvars(UVARS_TEST_PATH); bool loaded = uvars.load(); if (! loaded) { @@ -2204,22 +2214,35 @@ static void test_universal() for (int j=0; j < UVARS_PER_THREAD; j++) { const wcstring key = format_string(L"key_%d_%d", i, j); - const wcstring val = format_string(L"val_%d_%d", i, j); - const env_var_t var = uvars.get(key); - if (var != val) + env_var_t expected_val; + if (j == 0) { - err(L"Wrong value for key %ls: %ls vs %ls\n", key.c_str(), val.c_str(), var.missing() ? L"" : var.c_str()); + expected_val = env_var_t::missing_var(); + } + else + { + expected_val = format_string(L"val_%d_%d", i, j); + } + const env_var_t var = uvars.get(key); + if (j == 0) + { + assert(expected_val.missing()); + } + if (var != expected_val) + { + const wchar_t *missing_desc = L""; + err(L"Wrong value for key %ls: expected %ls, got %ls\n", key.c_str(), (expected_val.missing() ? missing_desc : expected_val.c_str()), (var.missing() ? missing_desc : var.c_str())); } } } - system("rm -Rf /tmp/fish_uvars_test"); + if (system("rm -Rf /tmp/fish_uvars_test")) err(L"rrm failed"); putc('\n', stderr); } static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy_t strategy) { - say(L"Testing universal notifiers with strategy", (int)strategy); + say(L"Testing universal notifiers with strategy %d", (int)strategy); universal_notifier_t *notifiers[16]; size_t notifier_count = sizeof notifiers / sizeof *notifiers; diff --git a/input_common.cpp b/input_common.cpp index d26f30b97..5b91073f4 100644 --- a/input_common.cpp +++ b/input_common.cpp @@ -96,7 +96,7 @@ static wint_t readb() input_flush_callbacks(); fd_set fdset; - int fd_max=0; + int fd_max = 0; int ioport = iothread_port(); int res; @@ -105,15 +105,32 @@ static wint_t readb() if (env_universal_server.fd > 0) { FD_SET(env_universal_server.fd, &fdset); - if (fd_max < env_universal_server.fd) fd_max = env_universal_server.fd; + fd_max = maxi(fd_max, env_universal_server.fd); } if (ioport > 0) { FD_SET(ioport, &fdset); - if (fd_max < ioport) fd_max = ioport; + fd_max = maxi(fd_max, ioport); + } + + /* Get our uvar notifier */ + universal_notifier_t ¬ifier = universal_notifier_t::default_notifier(); + + /* Get the suggested delay */ + struct timeval tv = {}; + const unsigned long usecs_delay = notifier.usec_delay_between_polls(); + if (usecs_delay > 0) + { + unsigned long usecs_per_sec = 1000000; + tv.tv_sec = (int)(usecs_delay / usecs_per_sec); + tv.tv_usec = (int)(usecs_delay % usecs_per_sec); + } + + res = select(fd_max + 1, &fdset, 0, 0, usecs_delay > 0 ? &tv : NULL); + if (res == 0) + { + fprintf(stderr, "ping\n"); } - - res = select(fd_max + 1, &fdset, 0, 0, 0); if (res==-1) { switch (errno) @@ -162,6 +179,12 @@ static wint_t readb() return lookahead_pop(); } } + + if (notifier.poll()) + { + fprintf(stderr, "Change note\n"); + env_universal_barrier(); + } if (ioport > 0 && FD_ISSET(ioport, &fdset)) {