Port remainder of iothreads from C++

This commit is contained in:
Mahmoud Al-Qudsi 2023-04-25 16:55:14 -05:00
parent 0963e6769e
commit 7f9a942f1d
2 changed files with 455 additions and 1 deletions

View file

@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub use self::fd_monitor_ffi::ItemWakeReason;
use self::fd_monitor_ffi::{new_fd_event_signaller, FdEventSignaller};
pub use self::fd_monitor_ffi::{new_fd_event_signaller, FdEventSignaller};
use crate::fd_readable_set::FdReadableSet;
use crate::fds::AutoCloseFd;
use crate::ffi::void_ptr;

View file

@ -2,8 +2,12 @@
//! ported directly from the cpp code so we can use rust threads instead of using pthreads.
use crate::flog::{FloggableDebug, FLOG};
use once_cell::race::OnceBox;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, ThreadId};
use std::time::{Duration, Instant};
impl FloggableDebug for ThreadId {}
@ -22,6 +26,39 @@ static THREAD_ASSERTS_CFG_FOR_TESTING: AtomicBool = AtomicBool::new(false);
/// This allows us to notice when we've forked.
static IS_FORKED_PROC: AtomicBool = AtomicBool::new(false);
/// Maximum number of threads for the IO thread pool.
const IO_MAX_THREADS: usize = 1024;
/// How long an idle [`ThreadPool`] thread will wait for work (against the condition variable)
/// before exiting.
const IO_WAIT_FOR_WORK_DURATION: Duration = Duration::from_millis(500);
/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for
/// completions, etc.
static IO_THREAD_POOL: OnceBox<Mutex<ThreadPool>> = OnceBox::new();
/// The event signaller singleton used for completions and queued main thread requests.
static NOTIFY_SIGNALLER: once_cell::sync::Lazy<&'static crate::fd_monitor::FdEventSignaller> =
once_cell::sync::Lazy::new(|| unsafe {
// This is leaked to avoid C++-side destructors. When ported fully to rust, we won't need to
// leak anything.
let signaller = crate::fd_monitor::new_fd_event_signaller();
let signaller_ref: &crate::fd_monitor::FdEventSignaller = signaller.as_ref().unwrap();
let result = std::mem::transmute(signaller_ref);
std::mem::forget(signaller);
result
});
/// A [`ThreadPool`] or [`Debounce`] work request.
type WorkItem = Box<dyn FnOnce() + 'static + Send>;
/// The queue of [`WorkItem`]s to be executed on the main thread. This is added to from
/// [`Debounce::enqueue_main_thread()`] and read from in `iothread_service_main()`.
///
/// Since items are enqueued from various background threads then read by the main thread, the work
/// items must implement `Send`.
static MAIN_THREAD_QUEUE: Mutex<Vec<WorkItem>> = Mutex::new(Vec::new());
/// Initialize some global static variables. Must be called at startup from the main thread.
pub fn init() {
unsafe {
@ -38,6 +75,10 @@ pub fn init() {
let result = libc::pthread_atfork(None, None, Some(child_post_fork));
assert_eq!(result, 0, "pthread_atfork() failure: {}", errno::errno());
}
IO_THREAD_POOL
.set(Box::new(Mutex::new(ThreadPool::new(1, IO_MAX_THREADS))))
.expect("IO_THREAD_POOL has already been initialized!");
}
#[inline(always)]
@ -153,6 +194,419 @@ pub fn spawn<F: FnOnce() + Send + 'static>(callback: F) -> bool {
result
}
/// Data shared between the thread pool [`ThreadPool`] and worker threads [`WorkerThread`].
#[derive(Default)]
struct ThreadPoolProtected {
/// The queue of outstanding, unclaimed work requests
pub request_queue: std::collections::VecDeque<WorkItem>,
/// The number of threads that exist in the pool
pub total_threads: usize,
/// The number of threads waiting for more work (i.e. idle threads)
pub waiting_threads: usize,
}
/// Data behind an [`Arc`] to share between the [`ThreadPool`] and [`WorkerThread`] instances.
#[derive(Default)]
struct ThreadPoolShared {
/// The mutex to access shared state between [`ThreadPool`] and [`WorkerThread`] instances. This
/// is accessed both standalone and via [`cond_var`](Self::cond_var).
mutex: Mutex<ThreadPoolProtected>,
/// The condition variable used to wake up waiting threads. This is tied to [`mutex`](Self::mutex).
cond_var: std::sync::Condvar,
}
pub struct ThreadPool {
/// The data which needs to be shared with worker threads.
shared: Arc<ThreadPoolShared>,
/// The minimum number of threads that will be kept waiting even when idle in the pool.
soft_min_threads: usize,
/// The maximum number of threads that will be created to service outstanding work requests, by
/// default. This may be bypassed.
max_threads: usize,
}
impl std::fmt::Debug for ThreadPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ThreadPool")
.field("min_threads", &self.soft_min_threads)
.field("max_threads", &self.max_threads)
.finish()
}
}
impl ThreadPool {
/// Construct a new `ThreadPool` instance with the specified min and max num of threads.
pub fn new(soft_min_threads: usize, max_threads: usize) -> Self {
ThreadPool {
shared: Default::default(),
soft_min_threads,
max_threads,
}
}
/// Enqueue a new work item onto the thread pool.
///
/// The function `func` will execute on one of the pool's background threads. If `cant_wait` is
/// set, the thread limit may be disregarded if extant threads are busy.
///
/// Returns the number of threads that were alive when the work item was enqueued.
pub fn perform<F: FnOnce() + 'static + Send>(&mut self, func: F, cant_wait: bool) -> usize {
let work_item = Box::new(func);
self.perform_inner(work_item, cant_wait)
}
fn perform_inner(&mut self, f: WorkItem, cant_wait: bool) -> usize {
enum ThreadAction {
None,
Wake,
Spawn,
}
let local_thread_count;
let thread_action = {
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
local_thread_count = data.total_threads;
data.request_queue.push_back(f);
FLOG!(
iothread,
"enqueuing work item (count is ",
data.request_queue.len(),
")"
);
if data.waiting_threads >= data.request_queue.len() {
// There are enough waiting threads, wake one up.
ThreadAction::Wake
} else if cant_wait || data.total_threads < self.max_threads {
// No threads are idle waiting but we can or must spawn a new thread to service the
// request.
data.total_threads += 1;
ThreadAction::Spawn
} else {
// There is no need to do anything because we've reached the max number of threads.
ThreadAction::None
}
};
// Act only after unlocking the mutex.
match thread_action {
ThreadAction::None => (),
ThreadAction::Wake => {
// Wake a thread if we decided to do so.
FLOG!(iothread, "notifying thread ", std::thread::current().id());
self.shared.cond_var.notify_one();
}
ThreadAction::Spawn => {
// Spawn a thread. If this fails, it means there are already a bunch of worker
// threads and it is very unlikely that they are all about to exit so one is likely
// able to handle the incoming request. This means we can ignore the failure with
// some degree of confidence. (This is also not an error we expect to routinely run
// into under normal, non-resource-starved circumstances.)
if self.spawn_thread() {
FLOG!(iothread, "pthread spawned");
} else {
// We failed to spawn a thread; decrement the thread count.
self.shared
.mutex
.lock()
.expect("Mutex poisoned!")
.total_threads -= 1;
}
}
}
local_thread_count
}
/// Attempt to spawn a new worker thread.
fn spawn_thread(&mut self) -> bool {
let shared = Arc::clone(&self.shared);
let soft_min_threads = self.soft_min_threads;
self::spawn(move || {
let worker = WorkerThread {
shared,
soft_min_threads,
};
worker.run();
})
}
}
pub struct WorkerThread {
/// The data shared with the [`ThreadPool`].
shared: Arc<ThreadPoolShared>,
/// The soft min number of threads for the associated [`ThreadPool`].
soft_min_threads: usize,
}
impl WorkerThread {
/// The worker loop entry point for this thread.
fn run(mut self) {
while let Some(work_item) = self.dequeue_work_or_commit_to_exit() {
FLOG!(
iothread,
"pthread ",
std::thread::current().id(),
" got work"
);
// Perform the work
work_item();
}
FLOG!(
iothread,
"pthread ",
std::thread::current().id(),
" exiting"
);
}
/// Dequeue a work item (perhaps waiting on the condition variable) or commit to exiting by
/// reducing the active thread count.
fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
// If the queue is empty, check to see if we should wait. We should wait if our exiting
// would drop us below our soft thread count minimum.
if data.request_queue.is_empty()
&& data.total_threads == self.soft_min_threads
&& IO_WAIT_FOR_WORK_DURATION > Duration::ZERO
{
data.waiting_threads += 1;
data = self
.shared
.cond_var
.wait_timeout(data, IO_WAIT_FOR_WORK_DURATION)
.expect("Mutex poisoned!")
.0;
data.waiting_threads -= 1;
}
// Now that we've (perhaps) waited, see if there's something on the queue.
let result = data.request_queue.pop_front();
// If we are returning None then ensure we balance the thread count increment from when we
// were created. This has to be done here in this awkward place because we've already
// committed to exiting - we will never pick up more work. So we need to make sure to
// decrement the thread count while holding the lock as we have effectively already exited.
if result.is_none() {
data.total_threads -= 1;
}
return result;
}
}
/// Returns a [`MutexGuard`](std::sync::MutexGuard) containing the IO [`ThreadPool`].
fn borrow_io_thread_pool() -> std::sync::MutexGuard<'static, ThreadPool> {
IO_THREAD_POOL
.get()
.unwrap()
.lock()
.expect("Mutex poisoned!")
}
/// Enqueues work on the IO thread pool singleton.
pub fn iothread_perform(f: impl FnOnce() + 'static + Send) {
let mut thread_pool = borrow_io_thread_pool();
thread_pool.perform(f, false);
}
/// Enqueues priority work on the IO thread pool singleton, disregarding the thread limit.
///
/// It does its best to spawn a thread if all other threads are occupied. This is primarily for
/// cases where deferring creation of a new thread might lead to a deadlock.
pub fn iothread_perform_cant_wait(f: impl FnOnce() + 'static + Send) {
let mut thread_pool = borrow_io_thread_pool();
thread_pool.perform(f, true);
}
pub fn iothread_service_main_with_timeout(timeout: Duration) {
if crate::fd_readable_set::is_fd_readable(
i32::from(NOTIFY_SIGNALLER.read_fd()),
timeout.as_millis() as u64,
) {
iothread_service_main();
}
}
pub fn iothread_service_main() {
self::assert_is_main_thread();
// Note: the order here is important. We must consume events before handling requests, as
// posting uses the opposite order.
NOTIFY_SIGNALLER.try_consume();
// Move the queue to a local variable. The MAIN_THREAD_QUEUE lock is not held after this.
let queue = std::mem::take(&mut *MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!"));
// Perform each completion in order.
for func in queue {
(func)();
}
}
/// Does nasty polling via select() and marked as unsafe because it should only be used for testing.
pub unsafe fn iothread_drain_all() {
while borrow_io_thread_pool()
.shared
.mutex
.lock()
.expect("Mutex poisoned!")
.total_threads
> 0
{
iothread_service_main_with_timeout(Duration::from_millis(1000));
}
}
/// `Debounce` is a simple class which executes one function on a background thread while enqueing
/// at most one more. Subsequent execution requests overwrite the enqueued one. It takes an optional
/// timeout; if a handler does not finish within the timeout then a new thread is spawned to service
/// the remaining request.
///
/// Debounce implementation note: we would like to enqueue at most one request, except if a thread
/// hangs (e.g. on fs access) then we do not want to block indefinitely - such threads are called
/// "abandoned". This is implemented via a monotone uint64 counter, called a token. Every time we
/// spawn a thread, we increment the token. When the thread has completed running a work item, it
/// compares its token to the active token; if they differ then this thread was abandoned.
#[derive(Clone)]
pub struct Debounce {
timeout: Duration,
/// The data shared between [`Debounce`] instances.
data: Arc<Mutex<DebounceData>>,
}
/// The data shared between [`Debounce`] instances.
struct DebounceData {
/// The (one or none) next enqueued request, overwritten each time a new call to
/// [`perform()`](Self::perform) is made.
next_req: Option<WorkItem>,
/// The non-zero token of the current non-abandoned thread or `None` if no thread is running.
active_token: Option<NonZeroU64>,
/// The next token to use when spawning a thread.
next_token: NonZeroU64,
/// The start time of the most recently spawned thread or request (if any).
start_time: Instant,
}
impl Debounce {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
data: Arc::new(Mutex::new(DebounceData {
next_req: None,
active_token: None,
next_token: NonZeroU64::new(1).unwrap(),
start_time: Instant::now(),
})),
}
}
/// Run an iteration in the background with the given thread token. Returns `true` if we handled
/// a request or `false` if there were no requests to handle (in which case the debounce thread
/// exits).
///
/// Note that this method is called from a background thread.
fn run_next(&self, token: NonZeroU64) -> bool {
let request = {
let mut data = self.data.lock().expect("Mutex poisoned!");
if let Some(req) = data.next_req.take() {
data.start_time = Instant::now();
req
} else {
// There is no pending request. Mark this token as no longer running.
if Some(token) == data.active_token {
data.active_token = None;
}
return false;
}
};
// Execute request after unlocking the mutex.
(request)();
return true;
}
/// Enqueue `handler` to be performed on a background thread. If another function is already
/// enqueued, this overwrites it and that function will not be executed.
///
/// The result is a token which is only of interest to the test suite.
pub fn perform(&self, handler: impl FnOnce() + 'static + Send) -> NonZeroU64 {
let h = Box::new(handler);
self.perform_inner(h)
}
/// Enqueue `handler` to be performed on a background thread with [`Completion`] `completion`
/// to be performed on the main thread. If a function is already enqueued, this overwrites it
/// and that function will not be executed.
///
/// If the function executes within the optional timeout then `completion` will be invoked on
/// the main thread with the result of the evaluated `handler`.
///
/// The result is a token which is only of interest to the test suite.
pub fn perform_with_completion<H, R, C>(&self, handler: H, completion: C) -> NonZeroU64
where
H: FnOnce() -> R + 'static + Send,
C: FnOnce(R) + 'static + Send,
R: 'static + Send,
{
let h = Box::new(move || {
let result = handler();
let c = Box::new(move || {
(completion)(result);
});
Self::enqueue_main_thread_result(c);
});
self.perform_inner(h)
}
fn perform_inner(&self, handler: WorkItem) -> NonZeroU64 {
let mut spawn = false;
let active_token = {
let mut data = self.data.lock().expect("Mutex poisoned!");
data.next_req = Some(handler);
// If we have a timeout and our running thread has exceeded it, abandon that thread.
if data.active_token.is_some()
&& !self.timeout.is_zero()
&& (Instant::now() - data.start_time > self.timeout)
{
// Abandon this thread by dissociating its token from this [`Debounce`] instance.
data.active_token = None;
}
if data.active_token.is_none() {
// We need to spawn a new thread. Mark the current time so that a new request won't
// immediately abandon us and start a new thread too.
spawn = true;
data.active_token = Some(data.next_token);
data.next_token = data.next_token.checked_add(1).unwrap();
data.start_time = Instant::now();
}
data.active_token.expect("Something should be active now.")
};
// Spawn after unlocking the mutex above.
if spawn {
// We need to clone the Arc to get it to last for the duration of the 'static lifetime.
let debounce = self.clone();
iothread_perform(move || {
while debounce.run_next(active_token) {
// Keep thread alive/busy.
}
});
}
active_token
}
/// Static helper to add a [`WorkItem`] to [`MAIN_THREAD_ID`] and signal [`NOTIFY_SIGNALLER`].
fn enqueue_main_thread_result(f: WorkItem) {
MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!").push(f);
NOTIFY_SIGNALLER.post();
}
}
#[test]
/// Verify that spawing a thread normally via [`std::thread::spawn()`] causes the calling thread's
/// sigmask to be inherited by the newly spawned thread.