diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index ee62a046e..e682acf7b 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -67,38 +67,59 @@ topic_monitor_t::~topic_monitor_t() = default; void topic_monitor_t::post(topic_t topic) { // Beware, we may be in a signal handler! // Atomically update the pending topics. - auto rawtopics = topic_to_bit(topic); - auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed); - if ((oldtopics & rawtopics) == rawtopics) { - // No new bits were set. + const uint8_t topicbit = topic_to_bit(topic); + + // CAS in our bit, capturing the old status value. + status_bits_t oldstatus; + bool cas_success = false; + while (!cas_success) { + oldstatus = status_.load(std::memory_order_relaxed); + // Clear wakeup bit and set our topic bit. + status_bits_t newstatus = oldstatus; + newstatus &= ~STATUS_NEEDS_WAKEUP; + newstatus |= topicbit; + cas_success = status_.compare_exchange_weak(oldstatus, newstatus); + } + // Note that if the STATUS_NEEDS_WAKEUP bit is set, no other bits must be set. + assert(((oldstatus == STATUS_NEEDS_WAKEUP) == bool(oldstatus & STATUS_NEEDS_WAKEUP)) && + "If STATUS_NEEDS_WAKEUP is set no other bits should be set"); + + // If the bit was already set, then someone else posted to this topic and nobody has reacted to + // it yet. In that case we're done. + if (oldstatus & topicbit) { return; } - // Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change - // by writing a byte to the pipe. - std::atomic_thread_fence(std::memory_order_release); - ssize_t ret; - do { - // write() is async signal safe. - const uint8_t v = 0; - ret = write(pipes_.write.fd(), &v, sizeof v); - } while (ret < 0 && errno == EINTR); - // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). + // We set a new bit. + // Check if we should wake up a thread because it was waiting. + if (oldstatus & STATUS_NEEDS_WAKEUP) { + std::atomic_thread_fence(std::memory_order_release); + ssize_t ret; + do { + // We must write exactly one byte. + // write() is async signal safe. + const uint8_t v = 0; + ret = write(pipes_.write.fd(), &v, sizeof v); + } while (ret < 0 && errno == EINTR); + // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). + } } generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock &data) { // Atomically acquire the pending updates, swapping in 0. - // If there are no pending updates (likely), just return. + // If there are no pending updates (likely) or a thread is waiting, just return. // Otherwise CAS in 0 and update our topics. const auto relaxed = std::memory_order_relaxed; topic_bitmask_t changed_topic_bits; bool cas_success; do { - changed_topic_bits = pending_updates_.load(relaxed); - if (changed_topic_bits == 0) return data->current; - cas_success = - pending_updates_.compare_exchange_weak(changed_topic_bits, 0, relaxed, relaxed); + changed_topic_bits = status_.load(relaxed); + if (changed_topic_bits == 0 || changed_topic_bits == STATUS_NEEDS_WAKEUP) + return data->current; + cas_success = status_.compare_exchange_weak(changed_topic_bits, 0); } while (!cas_success); + assert((changed_topic_bits & STATUS_NEEDS_WAKEUP) == 0 && + "Thread waiting bit should not be set"); // Update the current generation with our topics and return it. for (topic_t topic : all_topics()) { @@ -132,13 +153,30 @@ bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *g } // The generations haven't changed. Perhaps we become the reader. - if (!data->has_reader) { + // Note we still hold the lock, so this cannot race with any other thread becoming the + // reader. + if (data->has_reader) { + // We already have a reader, wait for it to notify us and loop again. + data_notifier_.wait(data.get_lock()); + continue; + } else { + // We will try to become the reader. + // Reader bit should not be set in this case. + assert((status_.load() & STATUS_NEEDS_WAKEUP) == 0 && "No thread should be waiting"); + // Try becoming the reader by marking the reader bit. + status_bits_t expected_old = 0; + if (!status_.compare_exchange_strong(expected_old, STATUS_NEEDS_WAKEUP)) { + // We failed to become the reader, perhaps because another topic post just arrived. + // Loop again. + continue; + } + // We successfully did a CAS from 0 -> STATUS_NEEDS_WAKEUP. + // Now any successive topic post must signal us. + FLOG(topic_monitor, "TID", thread_id(), "becoming reader"); become_reader = true; data->has_reader = true; break; } - // Not the reader, wait until the reader notifies us and loop again. - data_notifier_.wait(data.get_lock()); } return become_reader; } @@ -162,9 +200,15 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen FD_SET(fd, &fds); (void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */); #endif - uint8_t ignored[PIPE_BUF]; - auto unused = read(fd, ignored, sizeof ignored); - if (unused) { + // We must read exactly one byte. + for (;;) { + uint8_t ignored; + auto amt = read(fd, &ignored, sizeof ignored); + if (amt == 1) break; + if (amt < 0 && errno != EINTR && errno != EINTR) { + wperror(L"read"); + DIE("self-pipe read unexpected failure"); + } } // We are finished reading. We must stop being the reader, and post on the condition diff --git a/src/topic_monitor.h b/src/topic_monitor.h index baa11b309..76e6818f2 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -138,7 +138,7 @@ class topic_monitor_t { /// The current values. generation_list_t current{}; - /// Whether there is a thread currently reading from the notifier pipe. + /// A flag indicating that there is a current reader. bool has_reader{false}; }; owning_lock data_{}; @@ -147,9 +147,18 @@ class topic_monitor_t { /// This is associated with data_'s mutex. std::condition_variable data_notifier_{}; - /// The set of topics which have pending increments. - /// This is managed via atomics. - std::atomic pending_updates_{}; + /// A status value which describes our current state, managed via atomics. + /// Three possibilities: + /// 0: no changed topics, no thread is waiting. + /// 128: no changed topics, some thread is waiting and needs wakeup. + /// anything else: some changed topic, no thread is waiting. + /// Note that if the msb is set (status == 128) no other bit may be set. + using status_bits_t = uint8_t; + std::atomic status_{}; + + /// Sentinel status value indicating that a thread is waiting and needs a wakeup. + /// Note it is an error for this bit to be set and also any topic bit. + static constexpr uint8_t STATUS_NEEDS_WAKEUP = 128; /// Self-pipes used to communicate changes. /// The writer is a signal handler.