fish-shell/src/topic_monitor.cpp

284 lines
10 KiB
C++
Raw Normal View History

#include "config.h" // IWYU pragma: keep
#include "topic_monitor.h"
#include <unistd.h>
#include <cerrno>
#include "flog.h"
#include "iothread.h"
#include "maybe.h"
#include "wcstringutil.h"
#include "wutil.h"
wcstring generation_list_t::describe() const {
wcstring result;
for (generation_t gen : this->as_array()) {
if (!result.empty()) result.push_back(L',');
if (gen == invalid_generation) {
result.append(L"-1");
} else {
result.append(to_string(gen));
}
}
return result;
}
binary_semaphore_t::binary_semaphore_t() : sem_ok_(false) {
// sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning.
// On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304).
// So use fast semaphores on Linux only.
#ifdef __linux__
sem_ok_ = (0 == sem_init(&sem_, 0, 0));
#endif
if (!sem_ok_) {
auto pipes = make_autoclose_pipes();
assert(pipes.has_value() && "Failed to make pubsub pipes");
pipes_ = pipes.acquire();
// Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point
// where instrumented code makes certain blocking calls. But tsan cannot interrupt a signal
// call, so if we're blocked in read() (like the topic monitor wants to be!), we'll never
// receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as non-blocking
// (so reads will never block) and use select() to poll it.
#ifdef FISH_TSAN_WORKAROUNDS
DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd()));
#endif
}
}
binary_semaphore_t::~binary_semaphore_t() {
// We never use sem_t on Mac. The #ifdef avoids deprecation warnings.
#ifndef __APPLE__
if (sem_ok_) (void)sem_destroy(&sem_);
#endif
}
void binary_semaphore_t::die(const wchar_t *msg) const {
wperror(msg);
DIE("unexpected failure");
}
void binary_semaphore_t::post() {
if (sem_ok_) {
int res = sem_post(&sem_);
// sem_post is non-interruptible.
if (res < 0) die(L"sem_post");
} else {
// Write exactly one byte.
ssize_t ret;
do {
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
if (ret < 0) die(L"write");
}
}
void binary_semaphore_t::wait() {
if (sem_ok_) {
int res;
do {
res = sem_wait(&sem_);
} while (res < 0 && errno == EINTR);
// Other errors here are very unexpected.
if (res < 0) die(L"sem_wait");
} else {
int fd = pipes_.read.fd();
// We must read exactly one byte.
for (;;) {
#ifdef FISH_TSAN_WORKAROUNDS
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
// call until data is available (that is, fish would use 100% cpu while waiting for
// processes). This call prevents that.
(void)fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout);
#endif
uint8_t ignored;
auto amt = read(fd, &ignored, sizeof ignored);
if (amt == 1) break;
// EAGAIN should only be returned in TSan case.
if (amt < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) die(L"read");
}
}
}
/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
/// pointless at-exit handler for the dtor.
static topic_monitor_t *const s_principal = new topic_monitor_t();
topic_monitor_t &topic_monitor_t::principal() {
// Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
// signal handler so it must not be lazily created.
return *s_principal;
}
topic_monitor_t::topic_monitor_t() = default;
topic_monitor_t::~topic_monitor_t() = default;
void topic_monitor_t::post(topic_t topic) {
// Beware, we may be in a signal handler!
// Atomically update the pending topics.
const uint8_t topicbit = topic_to_bit(topic);
// CAS in our bit, capturing the old status value.
status_bits_t oldstatus;
bool cas_success = false;
while (!cas_success) {
oldstatus = status_.load(std::memory_order_relaxed);
// Clear wakeup bit and set our topic bit.
status_bits_t newstatus = oldstatus;
newstatus &= ~STATUS_NEEDS_WAKEUP;
newstatus |= topicbit;
cas_success = status_.compare_exchange_weak(oldstatus, newstatus);
}
// Note that if the STATUS_NEEDS_WAKEUP bit is set, no other bits must be set.
assert(((oldstatus == STATUS_NEEDS_WAKEUP) == bool(oldstatus & STATUS_NEEDS_WAKEUP)) &&
"If STATUS_NEEDS_WAKEUP is set no other bits should be set");
// If the bit was already set, then someone else posted to this topic and nobody has reacted to
// it yet. In that case we're done.
if (oldstatus & topicbit) {
return;
}
// We set a new bit.
// Check if we should wake up a thread because it was waiting.
if (oldstatus & STATUS_NEEDS_WAKEUP) {
std::atomic_thread_fence(std::memory_order_release);
sema_.post();
}
}
generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock<data_t> &data) {
2019-05-29 21:20:07 +00:00
// Atomically acquire the pending updates, swapping in 0.
// If there are no pending updates (likely) or a thread is waiting, just return.
2019-05-29 21:20:07 +00:00
// Otherwise CAS in 0 and update our topics.
const auto relaxed = std::memory_order_relaxed;
topic_bitmask_t changed_topic_bits;
2019-05-29 21:20:07 +00:00
bool cas_success;
do {
changed_topic_bits = status_.load(relaxed);
if (changed_topic_bits == 0 || changed_topic_bits == STATUS_NEEDS_WAKEUP)
return data->current;
cas_success = status_.compare_exchange_weak(changed_topic_bits, 0);
2019-05-29 21:20:07 +00:00
} while (!cas_success);
assert((changed_topic_bits & STATUS_NEEDS_WAKEUP) == 0 &&
"Thread waiting bit should not be set");
2019-05-29 21:20:07 +00:00
// Update the current generation with our topics and return it.
for (topic_t topic : all_topics()) {
if (changed_topic_bits & topic_to_bit(topic)) {
data->current.at(topic) += 1;
FLOG(topic_monitor, "Updating topic", static_cast<int>(topic), "to",
data->current.at(topic));
2019-05-29 21:20:07 +00:00
}
}
// Report our change.
data_notifier_.notify_all();
return data->current;
2019-05-29 21:20:07 +00:00
}
generation_list_t topic_monitor_t::updated_gens() {
auto data = data_.acquire();
return updated_gens_in_data(data);
}
bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *gens) {
bool become_reader = false;
auto data = data_.acquire();
for (;;) {
// See if the updated gen list has changed. If so we don't need to become the reader.
auto current = updated_gens_in_data(data);
FLOG(topic_monitor, "TID", thread_id(), "local ", gens->describe(), ": current",
current.describe());
if (*gens != current) {
*gens = current;
break;
}
// The generations haven't changed. Perhaps we become the reader.
// Note we still hold the lock, so this cannot race with any other thread becoming the
// reader.
if (data->has_reader) {
// We already have a reader, wait for it to notify us and loop again.
data_notifier_.wait(data.get_lock());
continue;
} else {
// We will try to become the reader.
// Reader bit should not be set in this case.
assert((status_.load() & STATUS_NEEDS_WAKEUP) == 0 && "No thread should be waiting");
// Try becoming the reader by marking the reader bit.
status_bits_t expected_old = 0;
if (!status_.compare_exchange_strong(expected_old, STATUS_NEEDS_WAKEUP)) {
// We failed to become the reader, perhaps because another topic post just arrived.
// Loop again.
continue;
}
// We successfully did a CAS from 0 -> STATUS_NEEDS_WAKEUP.
// Now any successive topic post must signal us.
FLOG(topic_monitor, "TID", thread_id(), "becoming reader");
become_reader = true;
data->has_reader = true;
break;
}
}
return become_reader;
}
generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gens) {
generation_list_t gens = input_gens;
while (gens == input_gens) {
bool become_reader = try_update_gens_maybe_becoming_reader(&gens);
if (become_reader) {
// Now we are the reader. Read from the pipe, and then update with any changes.
// Note we no longer hold the lock.
assert(gens == input_gens &&
"Generations should not have changed if we are the reader.");
// Wait to be woken up.
sema_.wait();
// We are finished waiting. We must stop being the reader, and post on the condition
// variable to wake up any other threads waiting for us to finish reading.
auto data = data_.acquire();
gens = data->current;
FLOG(topic_monitor, "TID", thread_id(), "local", input_gens.describe(),
"read() complete, current is", gens.describe());
assert(data->has_reader && "We should be the reader");
data->has_reader = false;
data_notifier_.notify_all();
}
}
return gens;
}
bool topic_monitor_t::check(generation_list_t *gens, bool wait) {
if (!gens->any_valid()) return false;
generation_list_t current = updated_gens();
bool changed = false;
for (;;) {
// Load the topic list and see if anything has changed.
for (topic_t topic : all_topics()) {
if (gens->is_valid(topic)) {
assert(gens->at(topic) <= current.at(topic) &&
"Incoming gen count exceeded published count");
if (gens->at(topic) < current.at(topic)) {
gens->at(topic) = current.at(topic);
changed = true;
}
}
}
// If we're not waiting, or something changed, then we're done.
if (!wait || changed) {
break;
}
// Wait until our gens change.
current = await_gens(current);
}
return changed;
}