Initial work on strategy_named_pipe universal notifier.

This commit is contained in:
ridiculousfish 2014-05-04 15:06:40 -07:00
parent 7e44bcfd8a
commit f27232bd0a
5 changed files with 168 additions and 8 deletions

View file

@ -2219,6 +2219,11 @@ scoped_lock::scoped_lock(pthread_mutex_t &mutex) : lock_obj(&mutex), locked(fals
this->lock(); this->lock();
} }
scoped_lock::scoped_lock(lock_t &lock) : lock_obj(&lock.mutex), locked(false)
{
this->lock();
}
scoped_lock::~scoped_lock() scoped_lock::~scoped_lock()
{ {
if (locked) this->unlock(); if (locked) this->unlock();

View file

@ -537,6 +537,22 @@ public:
bool is_forked_child(); bool is_forked_child();
class lock_t
{
public:
pthread_mutex_t mutex;
lock_t()
{
pthread_mutex_init(&mutex, NULL);
}
~lock_t()
{
pthread_mutex_destroy(&mutex);
}
};
/* Basic scoped lock class */ /* Basic scoped lock class */
class scoped_lock class scoped_lock
{ {
@ -551,6 +567,7 @@ public:
void lock(void); void lock(void);
void unlock(void); void unlock(void);
scoped_lock(pthread_mutex_t &mutex); scoped_lock(pthread_mutex_t &mutex);
scoped_lock(lock_t &lock);
~scoped_lock(); ~scoped_lock();
}; };

View file

@ -46,6 +46,7 @@
#include "utf8.h" #include "utf8.h"
#include "env_universal_common.h" #include "env_universal_common.h"
#include "path.h" #include "path.h"
#include "iothread.h"
#if __APPLE__ #if __APPLE__
#define FISH_NOTIFYD_AVAILABLE 1 #define FISH_NOTIFYD_AVAILABLE 1
@ -1688,14 +1689,143 @@ public:
} }
}; };
class universal_notifier_named_pipe_t : public universal_notifier_t
{
int pipe_fd;
// Remaining variables are protected by this lock
lock_t lock;
unsigned notification_seed;
bool notifier_thread_running;
void make_pipe(const wchar_t *test_path)
{
wcstring vars_path = test_path ? wcstring(test_path) : default_vars_path();
vars_path.append(L".notifier");
const std::string narrow_path = wcs2string(vars_path);
int fd = wopen_cloexec(vars_path, O_RDWR | O_NONBLOCK, 0600);
if (fd < 0 && errno == ENOENT)
{
/* File doesn't exist, try creating it */
if (mkfifo(narrow_path.c_str(), 0600) >= 0)
{
fd = wopen_cloexec(vars_path, O_RDWR | O_NONBLOCK, 0600);
}
}
if (fd < 0)
{
// Maybe open failed, maybe mkfifo failed
const int tmp_err = errno;
const wcstring err_msg = L"Unable to make or open a FIFO at " + vars_path;
errno = tmp_err;
wperror(err_msg.c_str());
}
else
{
pipe_fd = fd;
}
}
static int notify_in_background(universal_notifier_named_pipe_t *self)
{
// We need to write some data (any data) to the pipe, then sleep for a while, then read it back.
// Nobody is expected to read it except us.
// For debugging, we write our pid.
// Because we are in a background thread with all signals masked, we do not expect to get EINTR
const int pid_nbo = htonl(getpid());
scoped_lock locker(self->lock);
assert(self->notifier_thread_running);
for (;;)
{
// Determine the seed at the time we post our request
const unsigned initial_seed = self->notification_seed;
// Perform a notification for that seed
locker.unlock();
errno = 0;
ssize_t amt_written = write(self->pipe_fd, &pid_nbo, sizeof pid_nbo);
bool wrote_all = (amt_written == sizeof pid_nbo);
int err = errno;
if (! wrote_all)
{
// Paranoia. If for some reason our pipe is filled up, then we drain it.
// This might happen if there's a bug, or if the user manually redirects something into our pipe
bool wrote_partial = (amt_written >= 0 && amt_written < sizeof pid_nbo);
bool pipe_full = (wrote_partial || err == EWOULDBLOCK || err == EAGAIN);
if (pipe_full)
{
// Drain the pipe
unsigned char buff[256];
while (read(pid_nbo, buff, sizeof buff) > 0)
{
// Keep reading
}
}
}
// Now sleep a little
const long useconds_per_second = 1000000;
usleep(useconds_per_second / 25);
// Read back what we we wrote
int read_back;
read_ignore(self->pipe_fd, &read_back, sizeof read_back);
// See if we need to go around again
locker.lock();
if (initial_seed == self->notification_seed)
{
// No more notifications came in, we're done
break;
}
}
// Now we're done
// Note that we're still locked, so it's safe to manipulate this variable
self->notifier_thread_running = false;
return 0;
}
public:
universal_notifier_named_pipe_t(const wchar_t *test_path) : pipe_fd(-1), notification_seed(0), notifier_thread_running(false)
{
make_pipe(test_path);
}
int notification_fd()
{
return pipe_fd;
}
bool drain_notification_fd(int fd)
{
// We deliberately do nothing here
return false;
}
void post_notification()
{
if (pipe_fd >= 0)
{
scoped_lock locker(lock);
notification_seed++;
if (! notifier_thread_running)
{
// Need to kick it off
notifier_thread_running = true;
iothread_perform(notify_in_background, this);
}
}
}
};
universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy() universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy()
{ {
#if FISH_NOTIFYD_AVAILABLE #if FISH_NOTIFYD_AVAILABLE
return strategy_notifyd; return strategy_notifyd;
#elif FISH_INOTIFY_AVAILABLE
return strategy_inotify;
#else #else
return strategy_shmem_polling; return strategy_named_pipe;
#endif #endif
} }
@ -1722,6 +1852,8 @@ universal_notifier_t *universal_notifier_t::new_notifier_for_strategy(universal_
case strategy_inotify: case strategy_inotify:
return new universal_notifier_inotify_t(test_path); return new universal_notifier_inotify_t(test_path);
case strategy_named_pipe:
return new universal_notifier_named_pipe_t(test_path);
default: default:
fprintf(stderr, "Unsupported strategy %d\n", strat); fprintf(stderr, "Unsupported strategy %d\n", strat);

View file

@ -280,6 +280,7 @@ public:
{ {
strategy_default, strategy_default,
strategy_shmem_polling, strategy_shmem_polling,
strategy_named_pipe,
strategy_inotify, strategy_inotify,
strategy_notifyd strategy_notifyd
}; };
@ -303,9 +304,6 @@ public:
/* Default instance. Other instances are possible for testing. */ /* Default instance. Other instances are possible for testing. */
static universal_notifier_t &default_notifier(); static universal_notifier_t &default_notifier();
/* Returns the fd from which to watch for events, or -1 if none */
virtual int notification_fd();
/* Does a fast poll(). Returns true if changed. */ /* Does a fast poll(). Returns true if changed. */
virtual bool poll(); virtual bool poll();
@ -318,6 +316,9 @@ public:
/* Recommended delay between polls. A value of 0 means no polling required (so no timeout) */ /* Recommended delay between polls. A value of 0 means no polling required (so no timeout) */
virtual unsigned long usec_delay_between_polls() const; virtual unsigned long usec_delay_between_polls() const;
/* Returns the fd from which to watch for events, or -1 if none */
virtual int notification_fd();
/* The notification_fd is readable; drain it. Returns true if a notification is considered to have been posted. */ /* The notification_fd is readable; drain it. Returns true if a notification is considered to have been posted. */
virtual bool drain_notification_fd(int fd); virtual bool drain_notification_fd(int fd);
}; };

View file

@ -38,7 +38,12 @@ int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int),
return iothread_perform_base((int (*)(void *))handler, (void (*)(void *, int))completionCallback, static_cast<void *>(context)); return iothread_perform_base((int (*)(void *))handler, (void (*)(void *, int))completionCallback, static_cast<void *>(context));
} }
/** Helper templates */ template<typename T>
int iothread_perform(int (*handler)(T *), T *context)
{
return iothread_perform_base((int (*)(void *))handler, (void (*)(void *, int))0, static_cast<void *>(context));
}
template<typename T> template<typename T>
int iothread_perform_on_main(int (*handler)(T *), T *context) int iothread_perform_on_main(int (*handler)(T *), T *context)
{ {