Statically type binary_semaphore_t mode of operation (#10272)

* Cleanup binary_semaphore_t by removing `sem_ok_` checks

* Fix unused import on non-Linux platforms

---------

Co-authored-by: Mahmoud Al-Qudsi <mqudsi@neosmart.net>
This commit is contained in:
Bartłomiej Maryńczak 2024-01-28 19:21:15 +01:00 committed by GitHub
parent a03162bd5b
commit 2ca102193c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -28,7 +28,6 @@ use crate::wutil::perror;
use nix::errno::Errno; use nix::errno::Errno;
use nix::unistd; use nix::unistd;
use std::cell::{Cell, UnsafeCell}; use std::cell::{Cell, UnsafeCell};
use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Condvar, Mutex, MutexGuard}; use std::sync::{Condvar, Mutex, MutexGuard};
@ -156,78 +155,69 @@ impl GenerationsList {
/// A simple binary semaphore. /// A simple binary semaphore.
/// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of /// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of
/// a self-pipe. Note that post() must be async-signal safe. /// a self-pipe. Note that post() must be async-signal safe.
pub struct binary_semaphore_t { pub enum binary_semaphore_t {
// Whether our semaphore was successfully initialized. /// Initialized semaphore.
sem_ok_: bool, /// This is Box'd so it has a stable address.
Semaphore(Pin<Box<UnsafeCell<libc::sem_t>>>),
// The semaphore, if initalized. /// Pipes used to emulate a semaphore, if not initialized.
// This is Box'd so it has a stable address. Pipes(AutoClosePipes),
sem_: Pin<Box<UnsafeCell<libc::sem_t>>>,
// Pipes used to emulate a semaphore, if not initialized.
pipes_: AutoClosePipes,
} }
impl binary_semaphore_t { impl binary_semaphore_t {
pub fn new() -> binary_semaphore_t { pub fn new() -> binary_semaphore_t {
#[allow(unused_mut, unused_assignments)]
let mut sem_ok_ = false;
// sem_t does not have an initializer in Rust so we use zeroed().
#[allow(unused_mut)]
let mut sem_ = Pin::from(Box::new(UnsafeCell::new(unsafe { mem::zeroed() })));
let mut pipes_ = AutoClosePipes::default();
// sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning. // sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning.
// On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304). // On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304).
// So use fast semaphores on Linux only. // So use fast semaphores on Linux only.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
{ {
let res = unsafe { libc::sem_init(sem_.get(), 0, 0) }; // sem_t does not have an initializer in Rust so we use zeroed().
sem_ok_ = res == 0; let sem = Box::pin(UnsafeCell::new(unsafe { std::mem::zeroed() }));
}
if !sem_ok_ {
let pipes = fds::make_autoclose_pipes();
assert!(pipes.is_some(), "Failed to make pubsub pipes");
pipes_ = pipes.unwrap();
// Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the let res = unsafe { libc::sem_init(sem.get(), 0, 0) };
// point where instrumented code makes certain blocking calls. But tsan cannot interrupt if res == 0 {
// a signal call, so if we're blocked in read() (like the topic monitor wants to be!), return Self::Semaphore(sem);
// we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as
// non-blocking (so reads will never block) and use select() to poll it.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = make_fd_nonblocking(pipes_.read.fd());
} }
} }
binary_semaphore_t {
sem_ok_, let pipes = fds::make_autoclose_pipes().expect("Failed to make pubsub pipes");
sem_,
pipes_, // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the
// point where instrumented code makes certain blocking calls. But tsan cannot interrupt
// a signal call, so if we're blocked in read() (like the topic monitor wants to be!),
// we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as
// non-blocking (so reads will never block) and use select() to poll it.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = make_fd_nonblocking(pipes.read.fd());
} }
Self::Pipes(pipes)
} }
/// Release a waiting thread. /// Release a waiting thread.
pub fn post(&self) { pub fn post(&self) {
// Beware, we are in a signal handler. // Beware, we are in a signal handler.
if self.sem_ok_ { match self {
let res = unsafe { libc::sem_post(self.sem_.get()) }; Self::Semaphore(sem) => {
// sem_post is non-interruptible. let res = unsafe { libc::sem_post(sem.get()) };
if res < 0 { // sem_post is non-interruptible.
self.die("sem_post"); if res < 0 {
} self.die("sem_post");
} else {
// Write exactly one byte.
let success;
loop {
let v: u8 = 0;
let ret = unistd::write(self.pipes_.write.fd(), std::slice::from_ref(&v));
if ret.err() == Some(Errno::EINTR) {
continue;
} }
success = ret.is_ok();
break;
} }
if !success { Self::Pipes(pipes) => {
self.die("write"); // Write exactly one byte.
let success;
loop {
let ret = unistd::write(pipes.write.fd(), &[0]);
if ret.err() == Some(Errno::EINTR) {
continue;
}
success = ret.is_ok();
break;
}
if !success {
self.die("write");
}
} }
} }
} }
@ -235,39 +225,43 @@ impl binary_semaphore_t {
/// Wait for a post. /// Wait for a post.
/// This loops on EINTR. /// This loops on EINTR.
pub fn wait(&self) { pub fn wait(&self) {
if self.sem_ok_ { match self {
let mut res; Self::Semaphore(sem) => {
loop { let mut res;
res = unsafe { libc::sem_wait(self.sem_.get()) }; loop {
if res < 0 && Errno::last() == Errno::EINTR { res = unsafe { libc::sem_wait(sem.get()) };
continue; if res < 0 && Errno::last() == Errno::EINTR {
} continue;
break; }
}
// Other errors here are very unexpected.
if res < 0 {
self.die("sem_wait");
}
} else {
let fd = self.pipes_.read.fd();
// We must read exactly one byte.
loop {
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
// call until data is available (that is, fish would use 100% cpu while waiting for
// processes). This call prevents that.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout);
}
let mut ignored: u8 = 0;
let amt = unistd::read(fd, std::slice::from_mut(&mut ignored));
if amt.ok() == Some(1) {
break; break;
} }
// EAGAIN should only be returned in TSan case. // Other errors here are very unexpected.
if amt.is_err() if res < 0 {
&& (amt.err() != Some(Errno::EINTR) && amt.err() != Some(Errno::EAGAIN)) self.die("sem_wait");
{ }
self.die("read"); }
Self::Pipes(pipes) => {
let fd = pipes.read.fd();
// We must read exactly one byte.
loop {
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
// call until data is available (that is, fish would use 100% cpu while waiting for
// processes). This call prevents that.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ =
fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout);
}
let mut ignored: u8 = 0;
let amt = unistd::read(fd, std::slice::from_mut(&mut ignored));
if amt.ok() == Some(1) {
break;
}
// EAGAIN should only be returned in TSan case.
if amt.is_err()
&& (amt.err() != Some(Errno::EINTR) && amt.err() != Some(Errno::EAGAIN))
{
self.die("read");
}
} }
} }
} }
@ -281,12 +275,8 @@ impl binary_semaphore_t {
impl Drop for binary_semaphore_t { impl Drop for binary_semaphore_t {
fn drop(&mut self) { fn drop(&mut self) {
// We never use sem_t on Mac. The #ifdef avoids deprecation warnings. if let Self::Semaphore(sem) = self {
#[cfg(target_os = "linux")] _ = unsafe { libc::sem_destroy(sem.get()) };
{
if self.sem_ok_ {
_ = unsafe { libc::sem_destroy(self.sem_.get()) };
}
} }
} }
} }