topic_monitor to use binary semaphore instead of self-pipe

With the prior commit, the topic_monitor only writes to the pipe if a
thread is known to be waiting. This is effectively a binary semaphore, and
on systems that support anon semaphores (yes Linux, but not Mac) we can use
them. These are more efficient than self-pipes.

We add a binary_semaphore_t class which uses sem_t if sem_init succeeds,
and a self-pipe if it fails.

On Linux the seq_echo benchmark (run 1024 times) goes from 12.40 seconds to
11.59 seconds, about an 11% improvement.
This commit is contained in:
ridiculousfish 2020-08-20 13:57:51 -07:00
parent c2da175f34
commit 65e1c42a2b
2 changed files with 125 additions and 49 deletions

View file

@ -37,6 +37,78 @@ wcstring generation_list_t::describe() const {
return result;
}
binary_semaphore_t::binary_semaphore_t() : sem_ok_(false) {
// sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning.
#ifndef __APPLE__
sem_ok_ = (0 == sem_init(&sem_, 0, 0));
#endif
if (!sem_ok_) {
auto pipes = make_autoclose_pipes({});
assert(pipes.has_value() && "Failed to make pubsub pipes");
pipes_ = pipes.acquire();
#ifdef TOPIC_MONITOR_TSAN_WORKAROUND
DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd()));
#endif
}
}
binary_semaphore_t::~binary_semaphore_t() {
#ifndef __APPLE__
if (sem_ok_) (void)sem_destroy(&sem_);
#endif
}
void binary_semaphore_t::die(const wchar_t *msg) const {
wperror(msg);
DIE("unexpected failure");
}
void binary_semaphore_t::post() {
if (sem_ok_) {
int res = sem_post(&sem_);
// sem_post is non-interruptible.
if (res < 0) die(L"sem_post");
} else {
// Write exactly one byte.
ssize_t ret;
do {
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
if (ret < 0) die(L"write");
}
}
void binary_semaphore_t::wait() {
if (sem_ok_) {
int res;
do {
res = sem_wait(&sem_);
} while (res < 0 && errno == EINTR);
// Other errors here are very unexpected.
if (res < 0) die(L"sem_wait");
} else {
int fd = pipes_.read.fd();
#ifdef TOPIC_MONITOR_TSAN_WORKAROUND
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() call
// until data is available (that is, fish would use 100% cpu while waiting for processes).
// The select prevents that.
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
(void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */);
#endif
// We must read exactly one byte.
for (;;) {
uint8_t ignored;
auto amt = read(fd, &ignored, sizeof ignored);
if (amt == 1) break;
if (amt < 0 && errno != EINTR) die(L"read");
}
}
}
/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
/// pointless at-exit handler for the dtor.
static topic_monitor_t *const s_principal = new topic_monitor_t();
@ -47,21 +119,7 @@ topic_monitor_t &topic_monitor_t::principal() {
return *s_principal;
}
topic_monitor_t::topic_monitor_t() {
// Set up our pipes. Assert it succeeds.
auto pipes = make_autoclose_pipes({});
assert(pipes.has_value() && "Failed to make pubsub pipes");
pipes_ = pipes.acquire();
// Make sure that our write side doesn't block, else we risk hanging in a signal handler.
// The read end must block to avoid spinning in await.
DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd()));
#ifdef TOPIC_MONITOR_TSAN_WORKAROUND
DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd()));
#endif
}
topic_monitor_t::topic_monitor_t() = default;
topic_monitor_t::~topic_monitor_t() = default;
void topic_monitor_t::post(topic_t topic) {
@ -94,14 +152,7 @@ void topic_monitor_t::post(topic_t topic) {
// Check if we should wake up a thread because it was waiting.
if (oldstatus & STATUS_NEEDS_WAKEUP) {
std::atomic_thread_fence(std::memory_order_release);
ssize_t ret;
do {
// We must write exactly one byte.
// write() is async signal safe.
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
sema_.post();
}
}
@ -190,28 +241,11 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen
// Note we no longer hold the lock.
assert(gens == input_gens &&
"Generations should not have changed if we are the reader.");
int fd = pipes_.read.fd();
#ifdef TOPIC_MONITOR_TSAN_WORKAROUND
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
// call until data is available (that is, fish would use 100% cpu while waiting for
// processes). The select prevents that.
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
(void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */);
#endif
// We must read exactly one byte.
for (;;) {
uint8_t ignored;
auto amt = read(fd, &ignored, sizeof ignored);
if (amt == 1) break;
if (amt < 0 && errno != EINTR && errno != EINTR) {
wperror(L"read");
DIE("self-pipe read unexpected failure");
}
}
// We are finished reading. We must stop being the reader, and post on the condition
// Wait to be woken up.
sema_.wait();
// We are finished waiting. We must stop being the reader, and post on the condition
// variable to wake up any other threads waiting for us to finish reading.
auto data = data_.acquire();
gens = data->current;

View file

@ -1,6 +1,8 @@
#ifndef FISH_TOPIC_MONITOR_H
#define FISH_TOPIC_MONITOR_H
#include <semaphore.h>
#include <array>
#include <atomic>
#include <bitset>
@ -127,8 +129,48 @@ class generation_list_t {
: sighupint(sighupint), sigchld(sigchld), internal_exit(internal_exit) {}
};
/// A simple binary semaphore.
/// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of
/// a self-pipe. Note that post() must be async-signal safe.
class binary_semaphore_t {
public:
binary_semaphore_t();
~binary_semaphore_t();
/// Release a waiting thread.
void post();
/// Wait for a post.
/// This loops on EINTR.
void wait();
private:
// Print a message and exit.
void die(const wchar_t *msg) const;
// Whether our semaphore was successfully initialized.
bool sem_ok_{};
// The semaphore, if initalized.
sem_t sem_{};
// Pipes used to emulate a semaphore, if not initialized.
autoclose_pipes_t pipes_{};
};
/// The topic monitor class. This permits querying the current generation values for topics,
/// optionally blocking until they increase.
/// What we would like to write is that we have a set of topics, and threads wait for changes on a
/// condition variable which is tickled in post(). But this can't work because post() may be called
/// from a signal handler and condition variables are not async-signal safe.
/// So instead the signal handler announces changes via a binary semaphore.
/// In the wait case, what generally happens is:
/// A thread fetches the generations, see they have not changed, and then decides to try to wait.
/// It does so by atomically swapping in STATUS_NEEDS_WAKEUP to the status bits.
/// If that succeeds, it waits on the binary semaphore. The post() call will then wake the thread
/// up. If if failed, then either a post() call updated the status values (so perhaps there is a
/// new topic post) or some other thread won the race and called wait() on the semaphore. Here our
/// thread will wait on the data_notifier_ queue.
class topic_monitor_t {
private:
using topic_bitmask_t = uint8_t;
@ -139,6 +181,7 @@ class topic_monitor_t {
generation_list_t current{};
/// A flag indicating that there is a current reader.
/// The 'reader' is responsible for calling sema_.wait().
bool has_reader{false};
};
owning_lock<data_t> data_{};
@ -160,11 +203,10 @@ class topic_monitor_t {
/// Note it is an error for this bit to be set and also any topic bit.
static constexpr uint8_t STATUS_NEEDS_WAKEUP = 128;
/// Self-pipes used to communicate changes.
/// The writer is a signal handler.
/// "The reader" refers to a thread that wants to wait for changes. Only one thread can be the
/// reader at a given time.
autoclose_pipes_t pipes_;
/// Binary semaphore used to communicate changes.
/// If status_ is STATUS_NEEDS_WAKEUP, then a thread has commited to call wait() on our sema and
/// this must be balanced by the next call to post(). Note only one thread may wait at a time.
binary_semaphore_t sema_{};
/// Apply any pending updates to the data.
/// This accepts data because it must be locked.