diff --git a/src/io.cpp b/src/io.cpp index 92f880e69..443badeb7 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -127,34 +127,37 @@ void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) { void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { ASSERT_IS_MAIN_THREAD(); - assert(!fillthread_ && "Already have a fillthread"); + assert(!fillthread_running() && "Already have a fillthread"); // We want our background thread to own the fd but it's not easy to move into a std::function. // Use a shared_ptr. auto fdref = move_to_sharedptr(std::move(fd)); - // Our function to read until the receiver is closed. - // It's OK to capture 'this' by value because 'this' owns the background thread and joins it - // before dtor. - std::function func = [this, fdref]() { - this->run_background_fillthread(std::move(*fdref)); - }; + // Construct a promise that can go into our background thread. + auto promise = std::make_shared>(); - pthread_t fillthread{}; - if (!make_pthread(&fillthread, std::move(func))) { - wperror(L"make_pthread"); - } - fillthread_ = fillthread; + // Get the future associated with our promise. + // Note this should only ever be called once. + fillthread_waiter_ = promise->get_future(); + + // Run our function to read until the receiver is closed. + // It's OK to capture 'this' by value because 'this' owns the background thread and waits for it + // before dtor. + iothread_perform([this, promise, fdref]() { + this->run_background_fillthread(std::move(*fdref)); + promise->set_value(); + }); } void io_buffer_t::complete_background_fillthread() { ASSERT_IS_MAIN_THREAD(); - assert(fillthread_ && "Should have a fillthread"); + assert(fillthread_running() && "Should have a fillthread"); shutdown_fillthread_ = true; - void *ignored = nullptr; - int err = pthread_join(*fillthread_, &ignored); - DIE_ON_FAILURE(err); - fillthread_.reset(); + + // Wait for the fillthread to fulfill its promise, and then clear the future so we know we no + // longer have one. + fillthread_waiter_.wait(); + fillthread_waiter_ = {}; } shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, @@ -195,7 +198,7 @@ io_pipe_t::~io_pipe_t() = default; io_bufferfill_t::~io_bufferfill_t() = default; io_buffer_t::~io_buffer_t() { - assert(!fillthread_ && "io_buffer_t destroyed with outstanding fillthread"); + assert(!fillthread_running() && "io_buffer_t destroyed with outstanding fillthread"); } void io_chain_t::remove(const shared_ptr &element) { diff --git a/src/io.h b/src/io.h index 2c9c79deb..5ea8287de 100644 --- a/src/io.h +++ b/src/io.h @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -279,8 +280,9 @@ class io_buffer_t { /// Atomic flag indicating our fillthread should shut down. relaxed_atomic_bool_t shutdown_fillthread_{false}; - /// The background fillthread itself, if any. - maybe_t fillthread_{}; + /// The future allowing synchronization with the background fillthread, if the fillthread is + /// running. The fillthread fulfills the corresponding promise when it exits. + std::future fillthread_waiter_{}; /// Read limit of the buffer. const size_t read_limit_; @@ -297,6 +299,9 @@ class io_buffer_t { /// End the background fillthread operation. void complete_background_fillthread(); + /// Helper to return whether the fillthread is running. + bool fillthread_running() const { return fillthread_waiter_.valid(); } + public: explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) { // Explicitly reset the discard flag because we share this buffer. @@ -308,7 +313,7 @@ class io_buffer_t { /// Access the underlying buffer. /// This requires that the background fillthread be none. const separated_buffer_t &buffer() const { - assert(!fillthread_ && "Cannot access buffer during background fill"); + assert(!fillthread_running() && "Cannot access buffer during background fill"); return buffer_; }