Fix named pipe universal notifier. No more threads. Tests now pass.

This commit is contained in:
ridiculousfish 2014-05-05 23:33:05 -07:00
parent f27232bd0a
commit 8a263952ad
4 changed files with 172 additions and 101 deletions

View file

@ -1472,11 +1472,6 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t
}
}
bool needs_polling() const
{
return true;
}
bool poll()
{
bool result = false;
@ -1502,7 +1497,7 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t
unsigned long usec_per_sec = 1000000;
if (get_time() - last_change_time < 5LL * usec_per_sec)
{
return usec_per_sec / 25; //10 times a second
return usec_per_sec / 10; //10 times a second
}
else
{
@ -1560,7 +1555,7 @@ public:
return notify_fd;
}
bool drain_notification_fd(int fd)
bool notification_fd_became_readable(int fd)
{
/* notifyd notifications come in as 32 bit values. We don't care about the value. We set ourselves as non-blocking, so just read until we can't read any more. */
assert(fd == notify_fd);
@ -1656,7 +1651,7 @@ public:
return watch_fd;
}
bool drain_notification_fd(int fd)
bool notification_fd_became_readable(int fd)
{
assert(fd == watch_fd);
bool result = false;
@ -1689,14 +1684,18 @@ public:
}
};
#define NAMED_PIPE_FLASH_DURATION_USEC (1000000 / 10)
#define SUSTAINED_READABILITY_CLEANUP_DURATION_USEC (1000000 * 5)
class universal_notifier_named_pipe_t : public universal_notifier_t
{
int pipe_fd;
long long readback_time_usec;
size_t readback_amount;
// Remaining variables are protected by this lock
lock_t lock;
unsigned notification_seed;
bool notifier_thread_running;
bool is_readable;
long long drain_if_still_readable_time_usec;
void make_pipe(const wchar_t *test_path)
{
@ -1727,101 +1726,162 @@ class universal_notifier_named_pipe_t : public universal_notifier_t
}
}
static int notify_in_background(universal_notifier_named_pipe_t *self)
void drain_excessive_data()
{
// We need to write some data (any data) to the pipe, then sleep for a while, then read it back.
// Nobody is expected to read it except us.
// For debugging, we write our pid.
// Because we are in a background thread with all signals masked, we do not expect to get EINTR
const int pid_nbo = htonl(getpid());
scoped_lock locker(self->lock);
assert(self->notifier_thread_running);
for (;;)
{
// Determine the seed at the time we post our request
const unsigned initial_seed = self->notification_seed;
// Perform a notification for that seed
locker.unlock();
errno = 0;
ssize_t amt_written = write(self->pipe_fd, &pid_nbo, sizeof pid_nbo);
bool wrote_all = (amt_written == sizeof pid_nbo);
int err = errno;
if (! wrote_all)
{
// Paranoia. If for some reason our pipe is filled up, then we drain it.
// This might happen if there's a bug, or if the user manually redirects something into our pipe
bool wrote_partial = (amt_written >= 0 && amt_written < sizeof pid_nbo);
bool pipe_full = (wrote_partial || err == EWOULDBLOCK || err == EAGAIN);
if (pipe_full)
{
// Drain the pipe
unsigned char buff[256];
while (read(pid_nbo, buff, sizeof buff) > 0)
{
// Keep reading
}
}
}
// Now sleep a little
const long useconds_per_second = 1000000;
usleep(useconds_per_second / 25);
// Read back what we we wrote
int read_back;
read_ignore(self->pipe_fd, &read_back, sizeof read_back);
// See if we need to go around again
locker.lock();
if (initial_seed == self->notification_seed)
{
// No more notifications came in, we're done
break;
}
}
// Now we're done
// Note that we're still locked, so it's safe to manipulate this variable
self->notifier_thread_running = false;
return 0;
// The pipe seems to have data on it, that won't go away
// Read a big chunk out of it.
// We don't read until it's exhausted, because if someone were to pipe say /dev/null, that would cause us to hang!
size_t read_amt = 64 * 1024;
void *buff = malloc(read_amt);
read_ignore(this->pipe_fd, buff, read_amt);
free(buff);
}
public:
universal_notifier_named_pipe_t(const wchar_t *test_path) : pipe_fd(-1), notification_seed(0), notifier_thread_running(false)
universal_notifier_named_pipe_t(const wchar_t *test_path) : pipe_fd(-1), readback_time_usec(0), readback_amount(0), is_readable(false), drain_if_still_readable_time_usec(0)
{
make_pipe(test_path);
}
int notification_fd()
~universal_notifier_named_pipe_t()
{
return pipe_fd;
if (pipe_fd >= 0)
{
close(pipe_fd);
}
}
bool drain_notification_fd(int fd)
int notification_fd()
{
// We deliberately do nothing here
return false;
int result = -1;
if (! is_readable)
{
result = pipe_fd;
}
return result;
}
bool notification_fd_became_readable(int fd)
{
// Our fd is readable. We deliberately do not read anything out of it: if we did, other sessions may miss the notification.
// Instead, we go into "polling mode:" we do not select() on our fd for a while, and then sync in the future.
// However, if we remain readable for too long, we'll read out data
if (readback_time_usec > 0)
{
is_readable = true;
drain_if_still_readable_time_usec = get_time() + SUSTAINED_READABILITY_CLEANUP_DURATION_USEC;
}
return true;
}
void post_notification()
{
if (pipe_fd >= 0)
{
scoped_lock locker(lock);
notification_seed++;
if (! notifier_thread_running)
// We need to write some data (any data) to the pipe, then wait for a while, then read it back.
// Nobody is expected to read it except us.
int pid_nbo = htonl(getpid());
ssize_t amt_written = write(this->pipe_fd, &pid_nbo, sizeof pid_nbo);
if (amt_written < 0)
{
// Need to kick it off
notifier_thread_running = true;
iothread_perform(notify_in_background, this);
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
// Very unsual: the pipe is full!
drain_excessive_data();
}
}
// Now schedule a read for some time in the future
readback_time_usec = get_time() + NAMED_PIPE_FLASH_DURATION_USEC;
readback_amount += sizeof pid_nbo;
}
}
unsigned long usec_delay_between_polls() const
{
unsigned long result = 0;
if (this->readback_time_usec > 0)
{
// How long until the readback?
long long now = get_time();
if (now >= this->readback_time_usec)
{
// Oops, it already passed! Return something tiny.
result = 1000;
}
else
{
result = (unsigned long)(this->readback_time_usec - now);
}
}
if (is_readable)
{
// We're in polling mode
// Don't return a value less than our polling interval
if (result == 0 || result > NAMED_PIPE_FLASH_DURATION_USEC)
{
result = NAMED_PIPE_FLASH_DURATION_USEC;
}
}
return result;
}
bool poll()
{
bool result = false;
// Check if we are past the readback time
if (this->readback_time_usec > 0 && get_time() >= this->readback_time_usec)
{
// Read back what we wrote. We do nothing with the value.
while (this->readback_amount > 0)
{
char buff[64];
size_t amt_to_read = mini(this->readback_amount, sizeof buff);
read_ignore(this->pipe_fd, buff, amt_to_read);
this->readback_amount -= amt_to_read;
}
assert(this->readback_amount == 0);
this->readback_time_usec = 0;
}
// Check to see if we are doing readability polling
if (is_readable && pipe_fd >= 0)
{
// See if this is still readable
fd_set fds;
FD_ZERO(&fds);
FD_SET(this->pipe_fd, &fds);
struct timeval timeout = {};
select(this->pipe_fd + 1, &fds, NULL, NULL, &timeout);
if (! FD_ISSET(this->pipe_fd, &fds))
{
// No longer readable
is_readable = false;
drain_if_still_readable_time_usec = 0;
// Sync with the file to pick up any changes
result = true;
}
else
{
// Still readable. If it's been readable for a long time, there is probably lingering data on the pipe
if (get_time() >= drain_if_still_readable_time_usec)
{
drain_excessive_data();
}
}
}
// We use poll() as just a way to clean up our own messes. The real magic happens in notification_fd_became_readable
return false;
}
};
universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy()
{
return strategy_named_pipe;
#if FISH_NOTIFYD_AVAILABLE
return strategy_notifyd;
#else
@ -1886,7 +1946,7 @@ bool universal_notifier_t::poll()
bool universal_notifier_t::needs_polling() const
{
return false;
return this->usec_delay_between_polls() > 0;
}
unsigned long universal_notifier_t::usec_delay_between_polls() const
@ -1894,7 +1954,7 @@ unsigned long universal_notifier_t::usec_delay_between_polls() const
return 0;
}
bool universal_notifier_t::drain_notification_fd(int fd)
bool universal_notifier_t::notification_fd_became_readable(int fd)
{
return false;
}

View file

@ -308,7 +308,7 @@ public:
virtual bool poll();
/* Indicates whether this notifier requires polling. */
virtual bool needs_polling() const;
bool needs_polling() const;
/* Triggers a notification */
virtual void post_notification();
@ -320,7 +320,7 @@ public:
virtual int notification_fd();
/* The notification_fd is readable; drain it. Returns true if a notification is considered to have been posted. */
virtual bool drain_notification_fd(int fd);
virtual bool notification_fd_became_readable(int fd);
};
std::string get_machine_identifier();

View file

@ -2252,19 +2252,17 @@ bool poll_notifier(universal_notifier_t *note)
{
result = note->poll();
}
else
int fd = note->notification_fd();
if (fd >= 0)
{
int fd = note->notification_fd();
if (fd > 0)
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
struct timeval tv = {0, 0};
if (select(fd + 1, &fds, NULL, NULL, &tv) > 0 && FD_ISSET(fd, &fds))
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
struct timeval tv = {0, 0};
if (select(fd + 1, &fds, NULL, NULL, &tv) > 0 && FD_ISSET(fd, &fds))
{
result = note->drain_notification_fd(fd);
}
result = note->notification_fd_became_readable(fd);
}
}
return result;
@ -2288,6 +2286,9 @@ static void trigger_or_wait_for_notification(universal_notifier_t *notifier, uni
usleep(1000000 / 25);
break;
case universal_notifier_t::strategy_named_pipe:
break;
case universal_notifier_t::strategy_inotify:
{
// Hacktastic. Replace the file, then wait
@ -2342,7 +2343,17 @@ static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy
if (! poll_notifier(notifiers[i]))
{
err(L"Universal variable notifier polled failed to notice changes, with strategy %d", (int)strategy);
err(L"Universal variable notifier (%lu) %p polled failed to notice changes, with strategy %d", i, notifiers[i], (int)strategy);
}
}
// Named pipes have special cleanup requirements
if (strategy == universal_notifier_t::strategy_named_pipe)
{
usleep(1000000 / 10);
for (size_t i=0; i < notifier_count; i++)
{
poll_notifier(notifiers[i]);
}
}
}
@ -2361,20 +2372,20 @@ static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy
{
delete notifiers[i];
}
}
static void test_universal_notifiers()
{
if (system("mkdir -p /tmp/fish_uvars_test/ && touch /tmp/fish_uvars_test/varsfile.txt")) err(L"mkdir failed");
test_notifiers_with_strategy(universal_notifier_t::strategy_shmem_polling);
test_notifiers_with_strategy(universal_notifier_t::strategy_named_pipe);
#if __APPLE__
test_notifiers_with_strategy(universal_notifier_t::strategy_notifyd);
#endif
#if __linux || linux
if (system("mkdir -p /tmp/fish_uvars_test/ && touch /tmp/fish_uvars_test/varsfile.txt")) err(L"mkdir failed");
test_notifiers_with_strategy(universal_notifier_t::strategy_inotify);
if (system("rm -Rf /tmp/fish_uvars_test/")) err(L"rm failed");
#endif
if (system("rm -Rf /tmp/fish_uvars_test/")) err(L"rm failed");
}
class history_tests_t

View file

@ -192,7 +192,7 @@ static wint_t readb()
if (notifier_fd > 0 && FD_ISSET(notifier_fd, &fdset))
{
bool notified = notifier.drain_notification_fd(notifier_fd);
bool notified = notifier.notification_fd_became_readable(notifier_fd);
if (notified)
{
env_universal_barrier();