Use some fancy atomics in topic_monitor

The topic monitor is what allows a thread to wait for any of a set of
events. Events are identified by a bit in a "pending update" mask. Prior to
this fix, post() would atomically set the bit, and if it was newly set,
announce the change by unconditionally writing to a self-pipe. Threads
could wait for new posts by reading from the pipe.

This is less efficient than it could be; in particular if no thread is
waiting on the pipe, then the write() is unnecessary. This slows down our
signal handler.

Change the design in the following way: if a thread is committed to
waiting, then it atomically sets the "pending update" mask (now just called
status) to a sentinel value STATUS_NEEDS_WAKEUP. Then post() will only
write to the self-pipe if it sees that there is a thread waiting. This
reduces the number of syscalls.

The total effect is hardly noticeable (usually there is a thread waiting)
but it will be important for the next commit.
This commit is contained in:
ridiculousfish 2020-08-20 12:07:49 -07:00
parent 9ffaade0db
commit c2da175f34
2 changed files with 82 additions and 29 deletions

View file

@ -67,38 +67,59 @@ topic_monitor_t::~topic_monitor_t() = default;
void topic_monitor_t::post(topic_t topic) {
// Beware, we may be in a signal handler!
// Atomically update the pending topics.
auto rawtopics = topic_to_bit(topic);
auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed);
if ((oldtopics & rawtopics) == rawtopics) {
// No new bits were set.
const uint8_t topicbit = topic_to_bit(topic);
// CAS in our bit, capturing the old status value.
status_bits_t oldstatus;
bool cas_success = false;
while (!cas_success) {
oldstatus = status_.load(std::memory_order_relaxed);
// Clear wakeup bit and set our topic bit.
status_bits_t newstatus = oldstatus;
newstatus &= ~STATUS_NEEDS_WAKEUP;
newstatus |= topicbit;
cas_success = status_.compare_exchange_weak(oldstatus, newstatus);
}
// Note that if the STATUS_NEEDS_WAKEUP bit is set, no other bits must be set.
assert(((oldstatus == STATUS_NEEDS_WAKEUP) == bool(oldstatus & STATUS_NEEDS_WAKEUP)) &&
"If STATUS_NEEDS_WAKEUP is set no other bits should be set");
// If the bit was already set, then someone else posted to this topic and nobody has reacted to
// it yet. In that case we're done.
if (oldstatus & topicbit) {
return;
}
// Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change
// by writing a byte to the pipe.
std::atomic_thread_fence(std::memory_order_release);
ssize_t ret;
do {
// write() is async signal safe.
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
// We set a new bit.
// Check if we should wake up a thread because it was waiting.
if (oldstatus & STATUS_NEEDS_WAKEUP) {
std::atomic_thread_fence(std::memory_order_release);
ssize_t ret;
do {
// We must write exactly one byte.
// write() is async signal safe.
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
}
}
generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock<data_t> &data) {
// Atomically acquire the pending updates, swapping in 0.
// If there are no pending updates (likely), just return.
// If there are no pending updates (likely) or a thread is waiting, just return.
// Otherwise CAS in 0 and update our topics.
const auto relaxed = std::memory_order_relaxed;
topic_bitmask_t changed_topic_bits;
bool cas_success;
do {
changed_topic_bits = pending_updates_.load(relaxed);
if (changed_topic_bits == 0) return data->current;
cas_success =
pending_updates_.compare_exchange_weak(changed_topic_bits, 0, relaxed, relaxed);
changed_topic_bits = status_.load(relaxed);
if (changed_topic_bits == 0 || changed_topic_bits == STATUS_NEEDS_WAKEUP)
return data->current;
cas_success = status_.compare_exchange_weak(changed_topic_bits, 0);
} while (!cas_success);
assert((changed_topic_bits & STATUS_NEEDS_WAKEUP) == 0 &&
"Thread waiting bit should not be set");
// Update the current generation with our topics and return it.
for (topic_t topic : all_topics()) {
@ -132,13 +153,30 @@ bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *g
}
// The generations haven't changed. Perhaps we become the reader.
if (!data->has_reader) {
// Note we still hold the lock, so this cannot race with any other thread becoming the
// reader.
if (data->has_reader) {
// We already have a reader, wait for it to notify us and loop again.
data_notifier_.wait(data.get_lock());
continue;
} else {
// We will try to become the reader.
// Reader bit should not be set in this case.
assert((status_.load() & STATUS_NEEDS_WAKEUP) == 0 && "No thread should be waiting");
// Try becoming the reader by marking the reader bit.
status_bits_t expected_old = 0;
if (!status_.compare_exchange_strong(expected_old, STATUS_NEEDS_WAKEUP)) {
// We failed to become the reader, perhaps because another topic post just arrived.
// Loop again.
continue;
}
// We successfully did a CAS from 0 -> STATUS_NEEDS_WAKEUP.
// Now any successive topic post must signal us.
FLOG(topic_monitor, "TID", thread_id(), "becoming reader");
become_reader = true;
data->has_reader = true;
break;
}
// Not the reader, wait until the reader notifies us and loop again.
data_notifier_.wait(data.get_lock());
}
return become_reader;
}
@ -162,9 +200,15 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen
FD_SET(fd, &fds);
(void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */);
#endif
uint8_t ignored[PIPE_BUF];
auto unused = read(fd, ignored, sizeof ignored);
if (unused) {
// We must read exactly one byte.
for (;;) {
uint8_t ignored;
auto amt = read(fd, &ignored, sizeof ignored);
if (amt == 1) break;
if (amt < 0 && errno != EINTR && errno != EINTR) {
wperror(L"read");
DIE("self-pipe read unexpected failure");
}
}
// We are finished reading. We must stop being the reader, and post on the condition

View file

@ -138,7 +138,7 @@ class topic_monitor_t {
/// The current values.
generation_list_t current{};
/// Whether there is a thread currently reading from the notifier pipe.
/// A flag indicating that there is a current reader.
bool has_reader{false};
};
owning_lock<data_t> data_{};
@ -147,9 +147,18 @@ class topic_monitor_t {
/// This is associated with data_'s mutex.
std::condition_variable data_notifier_{};
/// The set of topics which have pending increments.
/// This is managed via atomics.
std::atomic<topic_bitmask_t> pending_updates_{};
/// A status value which describes our current state, managed via atomics.
/// Three possibilities:
/// 0: no changed topics, no thread is waiting.
/// 128: no changed topics, some thread is waiting and needs wakeup.
/// anything else: some changed topic, no thread is waiting.
/// Note that if the msb is set (status == 128) no other bit may be set.
using status_bits_t = uint8_t;
std::atomic<uint8_t> status_{};
/// Sentinel status value indicating that a thread is waiting and needs a wakeup.
/// Note it is an error for this bit to be set and also any topic bit.
static constexpr uint8_t STATUS_NEEDS_WAKEUP = 128;
/// Self-pipes used to communicate changes.
/// The writer is a signal handler.