#include "config.h" // IWYU pragma: keep #include "topic_monitor.h" #include #include #include "flog.h" #include "iothread.h" #include "wcstringutil.h" #include "wutil.h" wcstring generation_list_t::describe() const { wcstring result; for (generation_t gen : this->as_array()) { if (!result.empty()) result.push_back(L','); if (gen == invalid_generation) { result.append(L"-1"); } else { result.append(to_string(gen)); } } return result; } binary_semaphore_t::binary_semaphore_t() : sem_ok_(false) { // sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning. // On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304). // So use fast semaphores on Linux only. #ifdef __linux__ sem_ok_ = (0 == sem_init(&sem_, 0, 0)); #endif if (!sem_ok_) { auto pipes = make_autoclose_pipes(); assert(pipes.has_value() && "Failed to make pubsub pipes"); pipes_ = pipes.acquire(); // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point // where instrumented code makes certain blocking calls. But tsan cannot interrupt a signal // call, so if we're blocked in read() (like the topic monitor wants to be!), we'll never // receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as non-blocking // (so reads will never block) and use select() to poll it. #ifdef FISH_TSAN_WORKAROUNDS DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd())); #endif } } binary_semaphore_t::~binary_semaphore_t() { // We never use sem_t on Mac. The #ifdef avoids deprecation warnings. #ifndef __APPLE__ if (sem_ok_) (void)sem_destroy(&sem_); #endif } void binary_semaphore_t::die(const wchar_t *msg) const { wperror(msg); DIE("unexpected failure"); } void binary_semaphore_t::post() { if (sem_ok_) { int res = sem_post(&sem_); // sem_post is non-interruptible. if (res < 0) die(L"sem_post"); } else { // Write exactly one byte. ssize_t ret; do { const uint8_t v = 0; ret = write(pipes_.write.fd(), &v, sizeof v); } while (ret < 0 && errno == EINTR); if (ret < 0) die(L"write"); } } void binary_semaphore_t::wait() { if (sem_ok_) { int res; do { res = sem_wait(&sem_); } while (res < 0 && errno == EINTR); // Other errors here are very unexpected. if (res < 0) die(L"sem_wait"); } else { int fd = pipes_.read.fd(); // We must read exactly one byte. for (;;) { #ifdef FISH_TSAN_WORKAROUNDS // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() // call until data is available (that is, fish would use 100% cpu while waiting for // processes). This call prevents that. (void)fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout); #endif uint8_t ignored; auto amt = read(fd, &ignored, sizeof ignored); if (amt == 1) break; // EAGAIN should only be returned in TSan case. if (amt < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) die(L"read"); } } } /// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a /// pointless at-exit handler for the dtor. static topic_monitor_t *const s_principal = new topic_monitor_t(); topic_monitor_t &topic_monitor_t::principal() { // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a // signal handler so it must not be lazily created. return *s_principal; } topic_monitor_t::topic_monitor_t() = default; topic_monitor_t::~topic_monitor_t() = default; void topic_monitor_t::post(topic_t topic) { // Beware, we may be in a signal handler! // Atomically update the pending topics. const uint8_t topicbit = topic_to_bit(topic); // CAS in our bit, capturing the old status value. status_bits_t oldstatus; bool cas_success = false; while (!cas_success) { oldstatus = status_.load(std::memory_order_relaxed); // Clear wakeup bit and set our topic bit. status_bits_t newstatus = oldstatus; newstatus &= ~STATUS_NEEDS_WAKEUP; newstatus |= topicbit; cas_success = status_.compare_exchange_weak(oldstatus, newstatus); } // Note that if the STATUS_NEEDS_WAKEUP bit is set, no other bits must be set. assert(((oldstatus == STATUS_NEEDS_WAKEUP) == bool(oldstatus & STATUS_NEEDS_WAKEUP)) && "If STATUS_NEEDS_WAKEUP is set no other bits should be set"); // If the bit was already set, then someone else posted to this topic and nobody has reacted to // it yet. In that case we're done. if (oldstatus & topicbit) { return; } // We set a new bit. // Check if we should wake up a thread because it was waiting. if (oldstatus & STATUS_NEEDS_WAKEUP) { std::atomic_thread_fence(std::memory_order_release); sema_.post(); } } generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock &data) { // Atomically acquire the pending updates, swapping in 0. // If there are no pending updates (likely) or a thread is waiting, just return. // Otherwise CAS in 0 and update our topics. const auto relaxed = std::memory_order_relaxed; topic_bitmask_t changed_topic_bits; bool cas_success; do { changed_topic_bits = status_.load(relaxed); if (changed_topic_bits == 0 || changed_topic_bits == STATUS_NEEDS_WAKEUP) return data->current; cas_success = status_.compare_exchange_weak(changed_topic_bits, 0); } while (!cas_success); assert((changed_topic_bits & STATUS_NEEDS_WAKEUP) == 0 && "Thread waiting bit should not be set"); // Update the current generation with our topics and return it. for (topic_t topic : all_topics()) { if (changed_topic_bits & topic_to_bit(topic)) { data->current.at(topic) += 1; FLOG(topic_monitor, "Updating topic", static_cast(topic), "to", data->current.at(topic)); } } // Report our change. data_notifier_.notify_all(); return data->current; } generation_list_t topic_monitor_t::updated_gens() { auto data = data_.acquire(); return updated_gens_in_data(data); } bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *gens) { bool become_reader = false; auto data = data_.acquire(); for (;;) { // See if the updated gen list has changed. If so we don't need to become the reader. auto current = updated_gens_in_data(data); FLOG(topic_monitor, "TID", thread_id(), "local ", gens->describe(), ": current", current.describe()); if (*gens != current) { *gens = current; break; } // The generations haven't changed. Perhaps we become the reader. // Note we still hold the lock, so this cannot race with any other thread becoming the // reader. if (data->has_reader) { // We already have a reader, wait for it to notify us and loop again. data_notifier_.wait(data.get_lock()); continue; } else { // We will try to become the reader. // Reader bit should not be set in this case. assert((status_.load() & STATUS_NEEDS_WAKEUP) == 0 && "No thread should be waiting"); // Try becoming the reader by marking the reader bit. status_bits_t expected_old = 0; if (!status_.compare_exchange_strong(expected_old, STATUS_NEEDS_WAKEUP)) { // We failed to become the reader, perhaps because another topic post just arrived. // Loop again. continue; } // We successfully did a CAS from 0 -> STATUS_NEEDS_WAKEUP. // Now any successive topic post must signal us. FLOG(topic_monitor, "TID", thread_id(), "becoming reader"); become_reader = true; data->has_reader = true; break; } } return become_reader; } generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gens) { generation_list_t gens = input_gens; while (gens == input_gens) { bool become_reader = try_update_gens_maybe_becoming_reader(&gens); if (become_reader) { // Now we are the reader. Read from the pipe, and then update with any changes. // Note we no longer hold the lock. assert(gens == input_gens && "Generations should not have changed if we are the reader."); // Wait to be woken up. sema_.wait(); // We are finished waiting. We must stop being the reader, and post on the condition // variable to wake up any other threads waiting for us to finish reading. auto data = data_.acquire(); gens = data->current; FLOG(topic_monitor, "TID", thread_id(), "local", input_gens.describe(), "read() complete, current is", gens.describe()); assert(data->has_reader && "We should be the reader"); data->has_reader = false; data_notifier_.notify_all(); } } return gens; } bool topic_monitor_t::check(generation_list_t *gens, bool wait) { if (!gens->any_valid()) return false; generation_list_t current = updated_gens(); bool changed = false; for (;;) { // Load the topic list and see if anything has changed. for (topic_t topic : all_topics()) { if (gens->is_valid(topic)) { assert(gens->at(topic) <= current.at(topic) && "Incoming gen count exceeded published count"); if (gens->at(topic) < current.at(topic)) { gens->at(topic) = current.at(topic); changed = true; } } } // If we're not waiting, or something changed, then we're done. if (!wait || changed) { break; } // Wait until our gens change. current = await_gens(current); } return changed; }