mirror of
https://github.com/fish-shell/fish-shell
synced 2025-01-15 22:44:01 +00:00
Simplify topic monitoring
The topic monitor allows a client to wait for multiple events, e.g. sigchld or an internal process exit. Prior to this change a client had to specify the list of generations and the list of topics they are interested in. Simplify this to just the list of generations, with a max-value generation meaning the topic is not interesting. Also remove the use of enum_set and enum_array, it was too complex for what it offered.
This commit is contained in:
parent
f7ef91ae2a
commit
206b2d0a26
6 changed files with 170 additions and 108 deletions
|
@ -5782,22 +5782,23 @@ static void test_topic_monitor() {
|
|||
topic_monitor_t monitor;
|
||||
generation_list_t gens{};
|
||||
constexpr auto t = topic_t::sigchld;
|
||||
do_test(gens[t] == 0);
|
||||
gens.sigchld = 0;
|
||||
do_test(monitor.generation_for_topic(t) == 0);
|
||||
auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */);
|
||||
do_test(changed.none());
|
||||
do_test(gens[t] == 0);
|
||||
auto changed = monitor.check(&gens, false /* wait */);
|
||||
do_test(!changed);
|
||||
do_test(gens.sigchld == 0);
|
||||
|
||||
monitor.post(t);
|
||||
changed = monitor.check(&gens, topic_set_t{t}, true /* wait */);
|
||||
do_test(changed == topic_set_t{t});
|
||||
do_test(gens[t] == 1);
|
||||
changed = monitor.check(&gens, true /* wait */);
|
||||
do_test(changed);
|
||||
do_test(gens.at(t) == 1);
|
||||
do_test(monitor.generation_for_topic(t) == 1);
|
||||
|
||||
monitor.post(t);
|
||||
do_test(monitor.generation_for_topic(t) == 2);
|
||||
changed = monitor.check(&gens, topic_set_t{t}, true /* wait */);
|
||||
do_test(changed == topic_set_t{t});
|
||||
changed = monitor.check(&gens, true /* wait */);
|
||||
do_test(changed);
|
||||
do_test(gens.sigchld == 2);
|
||||
}
|
||||
|
||||
static void test_topic_monitor_torture() {
|
||||
|
@ -5807,7 +5808,7 @@ static void test_topic_monitor_torture() {
|
|||
constexpr auto t1 = topic_t::sigchld;
|
||||
constexpr auto t2 = topic_t::sighupint;
|
||||
std::vector<generation_list_t> gens;
|
||||
gens.resize(thread_count, generation_list_t{});
|
||||
gens.resize(thread_count, generation_list_t::invalids());
|
||||
std::atomic<uint32_t> post_count{};
|
||||
for (auto &gen : gens) {
|
||||
gen = monitor.current_generations();
|
||||
|
@ -5822,11 +5823,11 @@ static void test_topic_monitor_torture() {
|
|||
[&](size_t i) {
|
||||
for (size_t j = 0; j < (1 << 11); j++) {
|
||||
auto before = gens[i];
|
||||
auto changed = monitor.check(&gens[i], topic_set_t{t1, t2}, true /* wait */);
|
||||
auto changed = monitor.check(&gens[i], true /* wait */);
|
||||
(void)changed;
|
||||
do_test(before[t1] < gens[i][t1]);
|
||||
do_test(gens[i][t1] <= post_count);
|
||||
do_test(gens[i][t2] == 0);
|
||||
do_test(before.at(t1) < gens[i].at(t1));
|
||||
do_test(gens[i].at(t1) <= post_count);
|
||||
do_test(gens[i].at(t2) == 0);
|
||||
}
|
||||
auto amt = completed.fetch_add(1, std::memory_order_relaxed);
|
||||
(void)amt;
|
||||
|
|
34
src/proc.cpp
34
src/proc.cpp
|
@ -388,43 +388,35 @@ static void process_mark_finished_children(parser_t &parser, bool block_ok) {
|
|||
// The exit generation tells us if we have an exit; the signal generation allows for detecting
|
||||
// SIGHUP and SIGINT.
|
||||
// Get the gen count of all reapable processes.
|
||||
topic_set_t reaptopics{};
|
||||
generation_list_t gens{};
|
||||
gens.fill(invalid_generation);
|
||||
generation_list_t reapgens = generation_list_t::invalids();
|
||||
for (const auto &j : parser.jobs()) {
|
||||
for (const auto &proc : j->processes) {
|
||||
if (auto mtopic = j->reap_topic_for_process(proc.get())) {
|
||||
topic_t topic = *mtopic;
|
||||
reaptopics.set(topic);
|
||||
gens[topic] = std::min(gens[topic], proc->gens_[topic]);
|
||||
|
||||
reaptopics.set(topic_t::sighupint);
|
||||
gens[topic_t::sighupint] =
|
||||
std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]);
|
||||
reapgens.set_min_from(*mtopic, proc->gens_);
|
||||
reapgens.set_min_from(topic_t::sighupint, proc->gens_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (reaptopics.none()) {
|
||||
// No reapable processes, nothing to wait for.
|
||||
// Now check for changes, optionally waiting.
|
||||
if (!topic_monitor_t::principal().check(&reapgens, block_ok)) {
|
||||
// Nothing changed.
|
||||
return;
|
||||
}
|
||||
|
||||
// Now check for changes, optionally waiting.
|
||||
auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok);
|
||||
if (changed_topics.none()) return;
|
||||
|
||||
// We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT.
|
||||
// Update the hup/int generations and reap any reapable processes.
|
||||
for (auto &j : parser.jobs()) {
|
||||
for (const auto &proc : j->processes) {
|
||||
if (auto mtopic = j->reap_topic_for_process(proc.get())) {
|
||||
// Update the signal hup/int gen.
|
||||
proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint];
|
||||
topic_t topic = *mtopic;
|
||||
|
||||
if (proc->gens_[*mtopic] < gens[*mtopic]) {
|
||||
// Potentially reapable. Update its gen count and try reaping it.
|
||||
proc->gens_[*mtopic] = gens[*mtopic];
|
||||
// Update the signal hup/int gen.
|
||||
proc->gens_.sighupint = reapgens.sighupint;
|
||||
|
||||
if (proc->gens_.at(topic) < reapgens.at(topic)) {
|
||||
// Potentially reapable. Update its generation and try reaping it.
|
||||
proc->gens_.at(topic) = reapgens.at(topic);
|
||||
if (proc->internal_proc_) {
|
||||
// Try reaping an internal process.
|
||||
if (proc->internal_proc_->exited()) {
|
||||
|
|
|
@ -404,8 +404,7 @@ void signal_unblock_all() {
|
|||
sigprocmask(SIG_SETMASK, &iset, nullptr);
|
||||
}
|
||||
|
||||
sigchecker_t::sigchecker_t(topic_t signal) {
|
||||
topic_ = signal;
|
||||
sigchecker_t::sigchecker_t(topic_t signal) : topic_(signal) {
|
||||
// Call check() to update our generation.
|
||||
check();
|
||||
}
|
||||
|
@ -420,7 +419,7 @@ bool sigchecker_t::check() {
|
|||
|
||||
void sigchecker_t::wait() const {
|
||||
auto &tm = topic_monitor_t::principal();
|
||||
generation_list_t gens{};
|
||||
gens[topic_] = this->gen_;
|
||||
tm.check(&gens, {topic_}, true /* wait */);
|
||||
generation_list_t gens = generation_list_t::invalids();
|
||||
gens.at(topic_) = this->gen_;
|
||||
tm.check(&gens, true /* wait */);
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ void signal_clear_cancel();
|
|||
enum class topic_t : uint8_t;
|
||||
/// A sigint_detector_t can be used to check if a SIGINT (or SIGHUP) has been delivered.
|
||||
class sigchecker_t {
|
||||
topic_t topic_;
|
||||
const topic_t topic_;
|
||||
uint64_t gen_{0};
|
||||
|
||||
public:
|
||||
|
|
|
@ -7,13 +7,14 @@
|
|||
|
||||
#include "flog.h"
|
||||
#include "iothread.h"
|
||||
#include "wcstringutil.h"
|
||||
#include "wutil.h"
|
||||
|
||||
// Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point where
|
||||
// instrumented code makes certain blocking calls. But tsan cannot interrupt a signal call, so
|
||||
// if we're blocked in read() (like the topic monitor wants to be!), we'll never receive SIGCHLD
|
||||
// and so deadlock. So if tsan is enabled, we mark our fd as non-blocking (so reads will never
|
||||
// block) and use use select() to poll it.
|
||||
// block) and use select() to poll it.
|
||||
#if defined(__has_feature)
|
||||
#if __has_feature(thread_sanitizer)
|
||||
#define TOPIC_MONITOR_TSAN_WORKAROUND
|
||||
|
@ -23,16 +24,23 @@
|
|||
#define TOPIC_MONITOR_TSAN_WORKAROUND
|
||||
#endif
|
||||
|
||||
wcstring generation_list_t::describe() const {
|
||||
wcstring result;
|
||||
for (generation_t gen : this->as_array()) {
|
||||
if (!result.empty()) result.push_back(L',');
|
||||
if (gen == invalid_generation) {
|
||||
result.append(L"-1");
|
||||
} else {
|
||||
result.append(to_string(gen));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
|
||||
/// pointless at-exit handler for the dtor.
|
||||
static topic_monitor_t *const s_principal = new topic_monitor_t();
|
||||
|
||||
/// \return the metagen for a topic generation list.
|
||||
/// The metagen is simply the sum of topic generations. Note it is monotone.
|
||||
static generation_t metagen_for(const generation_list_t &lst) {
|
||||
return std::accumulate(lst.begin(), lst.end(), generation_t{0});
|
||||
}
|
||||
|
||||
topic_monitor_t &topic_monitor_t::principal() {
|
||||
// Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
|
||||
// signal handler so it must not be lazily created.
|
||||
|
@ -59,7 +67,7 @@ topic_monitor_t::~topic_monitor_t() = default;
|
|||
void topic_monitor_t::post(topic_t topic) {
|
||||
// Beware, we may be in a signal handler!
|
||||
// Atomically update the pending topics.
|
||||
auto rawtopics = topic_set_t{topic}.to_raw();
|
||||
auto rawtopics = topic_to_bit(topic);
|
||||
auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed);
|
||||
if ((oldtopics & rawtopics) == rawtopics) {
|
||||
// No new bits were set.
|
||||
|
@ -83,26 +91,26 @@ generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock<data_t> &d
|
|||
// If there are no pending updates (likely), just return.
|
||||
// Otherwise CAS in 0 and update our topics.
|
||||
const auto relaxed = std::memory_order_relaxed;
|
||||
topic_set_raw_t raw;
|
||||
topic_bitmask_t changed_topic_bits;
|
||||
bool cas_success;
|
||||
do {
|
||||
raw = pending_updates_.load(relaxed);
|
||||
if (raw == 0) return data->current_gens;
|
||||
cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed);
|
||||
changed_topic_bits = pending_updates_.load(relaxed);
|
||||
if (changed_topic_bits == 0) return data->current;
|
||||
cas_success =
|
||||
pending_updates_.compare_exchange_weak(changed_topic_bits, 0, relaxed, relaxed);
|
||||
} while (!cas_success);
|
||||
|
||||
// Update the current generation with our topics and return it.
|
||||
auto topics = topic_set_t::from_raw(raw);
|
||||
for (topic_t topic : topic_iter_t{}) {
|
||||
if (topics.get(topic)) {
|
||||
data->current_gens.at(topic) += 1;
|
||||
for (topic_t topic : all_topics()) {
|
||||
if (changed_topic_bits & topic_to_bit(topic)) {
|
||||
data->current.at(topic) += 1;
|
||||
FLOG(topic_monitor, "Updating topic", static_cast<int>(topic), "to",
|
||||
data->current_gens.at(topic));
|
||||
data->current.at(topic));
|
||||
}
|
||||
}
|
||||
// Report our change.
|
||||
data_notifier_.notify_all();
|
||||
return data->current_gens;
|
||||
return data->current;
|
||||
}
|
||||
|
||||
generation_list_t topic_monitor_t::updated_gens() {
|
||||
|
@ -116,8 +124,8 @@ bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *g
|
|||
for (;;) {
|
||||
// See if the updated gen list has changed. If so we don't need to become the reader.
|
||||
auto current = updated_gens_in_data(data);
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(*gens), ": current",
|
||||
metagen_for(current));
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local ", gens->describe(), ": current",
|
||||
current.describe());
|
||||
if (*gens != current) {
|
||||
*gens = current;
|
||||
break;
|
||||
|
@ -162,9 +170,9 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen
|
|||
// We are finished reading. We must stop being the reader, and post on the condition
|
||||
// variable to wake up any other threads waiting for us to finish reading.
|
||||
auto data = data_.acquire();
|
||||
gens = data->current_gens;
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(input_gens),
|
||||
"read() complete, current mgen is", metagen_for(gens));
|
||||
gens = data->current;
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local", input_gens.describe(),
|
||||
"read() complete, current is", gens.describe());
|
||||
assert(data->has_reader && "We should be the reader");
|
||||
data->has_reader = false;
|
||||
data_notifier_.notify_all();
|
||||
|
@ -173,26 +181,26 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen
|
|||
return gens;
|
||||
}
|
||||
|
||||
topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) {
|
||||
if (topics.none()) return topics;
|
||||
bool topic_monitor_t::check(generation_list_t *gens, bool wait) {
|
||||
if (!gens->any_valid()) return false;
|
||||
|
||||
generation_list_t current = updated_gens();
|
||||
topic_set_t changed{};
|
||||
bool changed = false;
|
||||
for (;;) {
|
||||
// Load the topic list and see if anything has changed.
|
||||
for (topic_t topic : topic_iter_t{}) {
|
||||
if (topics.get(topic)) {
|
||||
for (topic_t topic : all_topics()) {
|
||||
if (gens->is_valid(topic)) {
|
||||
assert(gens->at(topic) <= current.at(topic) &&
|
||||
"Incoming gen count exceeded published count");
|
||||
if (gens->at(topic) < current.at(topic)) {
|
||||
gens->at(topic) = current.at(topic);
|
||||
changed.set(topic);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we're not waiting, or something changed, then we're done.
|
||||
if (!wait || changed.any()) {
|
||||
if (!wait || changed) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#include <numeric>
|
||||
|
||||
#include "common.h"
|
||||
#include "enum_set.h"
|
||||
#include "io.h"
|
||||
|
||||
/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example,
|
||||
|
@ -32,50 +31,109 @@
|
|||
set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit.
|
||||
*/
|
||||
|
||||
/// The list of topics that may be observed.
|
||||
enum class topic_t : uint8_t {
|
||||
sigchld, // Corresponds to SIGCHLD signal.
|
||||
sighupint, // Corresponds to both SIGHUP and SIGINT signals.
|
||||
internal_exit, // Corresponds to an internal process exit.
|
||||
COUNT
|
||||
};
|
||||
|
||||
/// Allow enum_iter to be used.
|
||||
template <>
|
||||
struct enum_info_t<topic_t> {
|
||||
static constexpr auto count = topic_t::COUNT;
|
||||
};
|
||||
|
||||
/// Set of topics.
|
||||
using topic_set_t = enum_set_t<topic_t>;
|
||||
|
||||
/// Counting iterator for topics.
|
||||
using topic_iter_t = enum_iter_t<topic_t>;
|
||||
|
||||
/// A generation is a counter incremented every time the value of a topic changes.
|
||||
/// It is 64 bit so it will never wrap.
|
||||
using generation_t = uint64_t;
|
||||
|
||||
/// A generation value which is guaranteed to never be set and be larger than any valid generation.
|
||||
/// A generation value which indicates the topic is not of interest.
|
||||
constexpr generation_t invalid_generation = std::numeric_limits<generation_t>::max();
|
||||
|
||||
/// List of generation values, indexed by topics.
|
||||
/// The initial value of a generation is always 0.
|
||||
using generation_list_t = enum_array_t<generation_t, topic_t>;
|
||||
/// The list of topics which may be observed.
|
||||
enum class topic_t : uint8_t {
|
||||
sighupint, // Corresponds to both SIGHUP and SIGINT signals.
|
||||
sigchld, // Corresponds to SIGCHLD signal.
|
||||
internal_exit, // Corresponds to an internal process exit.
|
||||
};
|
||||
|
||||
/// Helper to return all topics, allowing easy iteration.
|
||||
inline std::array<topic_t, 3> all_topics() {
|
||||
return {{topic_t::sighupint, topic_t::sigchld, topic_t::internal_exit}};
|
||||
}
|
||||
|
||||
/// Simple value type containing the values for a topic.
|
||||
/// This should be kept in sync with topic_t.
|
||||
class generation_list_t {
|
||||
public:
|
||||
generation_list_t() = default;
|
||||
|
||||
generation_t sighupint{0};
|
||||
generation_t sigchld{0};
|
||||
generation_t internal_exit{0};
|
||||
|
||||
/// \return the value for a topic.
|
||||
generation_t &at(topic_t topic) {
|
||||
switch (topic) {
|
||||
case topic_t::sigchld:
|
||||
return sigchld;
|
||||
case topic_t::sighupint:
|
||||
return sighupint;
|
||||
case topic_t::internal_exit:
|
||||
return internal_exit;
|
||||
}
|
||||
DIE("Unreachable");
|
||||
}
|
||||
|
||||
generation_t at(topic_t topic) const {
|
||||
switch (topic) {
|
||||
case topic_t::sighupint:
|
||||
return sighupint;
|
||||
case topic_t::sigchld:
|
||||
return sigchld;
|
||||
case topic_t::internal_exit:
|
||||
return internal_exit;
|
||||
}
|
||||
DIE("Unreachable");
|
||||
}
|
||||
|
||||
/// \return ourselves as an array.
|
||||
std::array<generation_t, 3> as_array() const { return {{sighupint, sigchld, internal_exit}}; }
|
||||
|
||||
/// Set the value of \p topic to the smaller of our value and the value in \p other.
|
||||
void set_min_from(topic_t topic, const generation_list_t &other) {
|
||||
if (this->at(topic) > other.at(topic)) {
|
||||
this->at(topic) = other.at(topic);
|
||||
}
|
||||
}
|
||||
|
||||
/// \return whether a topic is valid.
|
||||
bool is_valid(topic_t topic) const { return this->at(topic) != invalid_generation; }
|
||||
|
||||
/// \return whether any topic is valid.
|
||||
bool any_valid() const {
|
||||
bool valid = false;
|
||||
for (auto gen : as_array()) {
|
||||
if (gen != invalid_generation) valid = true;
|
||||
}
|
||||
return valid;
|
||||
}
|
||||
|
||||
bool operator==(const generation_list_t &rhs) const { return as_array() == rhs.as_array(); }
|
||||
|
||||
bool operator!=(const generation_list_t &rhs) const { return !(*this == rhs); }
|
||||
|
||||
/// return a string representation for debugging.
|
||||
wcstring describe() const;
|
||||
|
||||
/// Generation list containing invalid generations only.
|
||||
static generation_list_t invalids() {
|
||||
return generation_list_t(invalid_generation, invalid_generation, invalid_generation);
|
||||
}
|
||||
|
||||
private:
|
||||
generation_list_t(generation_t sighupint, generation_t sigchld, generation_t internal_exit)
|
||||
: sighupint(sighupint), sigchld(sigchld), internal_exit(internal_exit) {}
|
||||
};
|
||||
|
||||
/// The topic monitor class. This permits querying the current generation values for topics,
|
||||
/// optionally blocking until they increase.
|
||||
class topic_monitor_t {
|
||||
private:
|
||||
using topic_set_raw_t = uint8_t;
|
||||
|
||||
static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count<topic_t>(),
|
||||
"topic_set_raw is too small");
|
||||
using topic_bitmask_t = uint8_t;
|
||||
|
||||
// Some stuff that needs to be protected by the same lock.
|
||||
struct data_t {
|
||||
/// The current generation list.
|
||||
generation_list_t current_gens{};
|
||||
/// The current values.
|
||||
generation_list_t current{};
|
||||
|
||||
/// Whether there is a thread currently reading from the notifier pipe.
|
||||
bool has_reader{false};
|
||||
|
@ -88,7 +146,7 @@ class topic_monitor_t {
|
|||
|
||||
/// The set of topics which have pending increments.
|
||||
/// This is managed via atomics.
|
||||
std::atomic<topic_set_raw_t> pending_updates_{};
|
||||
std::atomic<topic_bitmask_t> pending_updates_{};
|
||||
|
||||
/// Self-pipes used to communicate changes.
|
||||
/// The writer is a signal handler.
|
||||
|
@ -118,6 +176,9 @@ class topic_monitor_t {
|
|||
/// \return the current generation list, opportunistically applying any pending updates.
|
||||
generation_list_t updated_gens();
|
||||
|
||||
/// Helper to convert a topic to a bitmask containing just that topic.
|
||||
static topic_bitmask_t topic_to_bit(topic_t t) { return 1 << static_cast<topic_bitmask_t>(t); }
|
||||
|
||||
public:
|
||||
topic_monitor_t();
|
||||
~topic_monitor_t();
|
||||
|
@ -140,11 +201,12 @@ class topic_monitor_t {
|
|||
/// Access the generation for a topic.
|
||||
generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); }
|
||||
|
||||
/// See if for any topic (specified in \p topics) has changed from the values in the generation
|
||||
/// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return
|
||||
/// immediately.
|
||||
/// \return the set of topics that changed, updating the generation list \p gens.
|
||||
topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait);
|
||||
/// For each valid topic in \p gens, check to see if the current topic is larger than
|
||||
/// the value in \p gens.
|
||||
/// If \p wait is set, then wait if there are no changes; otherwise return immediately.
|
||||
/// \return true if some topic changed, false if none did.
|
||||
/// On a true return, this updates the generation list \p gens.
|
||||
bool check(generation_list_t *gens, bool wait);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue