Add and use type-erased RAII callback wrapper for ffi

This allows the rust code to free up C++ resources allocated for a callback even
when the callback isn't executed (as opposed to requiring the callback to run
and at the end of the callback cleaning up all allocated resources).

Also add type-erased destructor registration to callback_t. This allows for
freeing variables allocated by the callback for debounce_t's
perform_with_callback() that don't end up having their completion called due to
a timeout.
This commit is contained in:
Mahmoud Al-Qudsi 2023-04-26 10:29:38 -05:00
parent 6cd2d0ffed
commit ecf1676601
6 changed files with 160 additions and 155 deletions

View file

@ -118,7 +118,7 @@ set(FISH_SRCS
src/exec.cpp src/expand.cpp src/fallback.cpp src/fish_indent_common.cpp src/fish_version.cpp
src/flog.cpp src/function.cpp src/highlight.cpp
src/history.cpp src/history_file.cpp src/input.cpp src/input_common.cpp
src/io.cpp src/iothread.cpp src/kill.cpp
src/io.cpp src/kill.cpp
src/null_terminated_array.cpp src/operation_context.cpp src/output.cpp
src/pager.cpp src/parse_execution.cpp src/parse_util.cpp
src/parser.cpp src/parser_keywords.cpp src/path.cpp src/postfork.cpp

View file

@ -338,3 +338,21 @@ impl core::convert::From<*const autocxx::c_void> for void_ptr {
Self(value as *const _)
}
}
impl core::convert::From<void_ptr> for *const u8 {
fn from(value: void_ptr) -> Self {
value.0 as *const _
}
}
impl core::convert::From<void_ptr> for *const core::ffi::c_void {
fn from(value: void_ptr) -> Self {
value.0 as *const _
}
}
impl core::convert::From<void_ptr> for *const autocxx::c_void {
fn from(value: void_ptr) -> Self {
value.0 as *const _
}
}

View file

@ -51,6 +51,15 @@ static NOTIFY_SIGNALLER: once_cell::sync::Lazy<&'static crate::fd_monitor::FdEve
#[cxx::bridge]
mod ffi {
unsafe extern "C++" {
include!("callback.h");
#[rust_name = "CppCallback"]
type callback_t;
fn invoke(&self) -> *const u8;
fn invoke_with_param(&self, param: *const u8) -> *const u8;
}
extern "Rust" {
#[cxx_name = "ASSERT_IS_MAIN_THREAD"]
fn assert_is_main_thread();
@ -65,7 +74,7 @@ mod ffi {
extern "Rust" {
#[cxx_name = "make_detached_pthread"]
fn spawn_ffi(callback: *const u8, param: *const u8) -> bool;
fn spawn_ffi(callback: &SharedPtr<CppCallback>) -> bool;
}
extern "Rust" {
@ -76,9 +85,9 @@ mod ffi {
#[cxx_name = "iothread_drain_all"]
fn iothread_drain_all_ffi();
#[cxx_name = "iothread_perform"]
fn iothread_perform_ffi(callback: *const u8, param: *const u8);
fn iothread_perform_ffi(callback: &SharedPtr<CppCallback>);
#[cxx_name = "iothread_perform_cantwait"]
fn iothread_perform_cant_wait_ffi(callback: *const u8, param: *const u8);
fn iothread_perform_cant_wait_ffi(callback: &SharedPtr<CppCallback>);
}
extern "Rust" {
@ -86,14 +95,12 @@ mod ffi {
type Debounce;
#[cxx_name = "perform"]
fn perform_ffi(&self, callback: *const u8, param: *const u8) -> u64;
fn perform_ffi(&self, callback: &SharedPtr<CppCallback>) -> u64;
#[cxx_name = "perform_with_completion"]
fn perform_with_completion_ffi(
&self,
callback: *const u8,
param1: *const u8,
completion: *const u8,
param2: *const u8,
callback: &SharedPtr<CppCallback>,
completion: &SharedPtr<CppCallback>,
) -> u64;
#[cxx_name = "new_debounce_t"]
@ -101,6 +108,9 @@ mod ffi {
}
}
unsafe impl Send for ffi::CppCallback {}
unsafe impl Sync for ffi::CppCallback {}
fn iothread_service_main_with_timeout_ffi(timeout_usec: u64) {
iothread_service_main_with_timeout(Duration::from_micros(timeout_usec))
}
@ -109,23 +119,19 @@ fn iothread_drain_all_ffi() {
unsafe { iothread_drain_all() }
}
fn iothread_perform_ffi(callback: *const u8, param: *const u8) {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
fn iothread_perform_ffi(callback: &cxx::SharedPtr<ffi::CppCallback>) {
let callback = callback.clone();
iothread_perform(move || {
callback(param);
callback.invoke();
});
}
fn iothread_perform_cant_wait_ffi(callback: *const u8, param: *const u8) {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
fn iothread_perform_cant_wait_ffi(callback: &cxx::SharedPtr<ffi::CppCallback>) {
let callback = callback.clone();
iothread_perform_cant_wait(move || {
callback(param);
callback.invoke();
});
}
@ -286,13 +292,10 @@ pub fn spawn<F: FnOnce() + Send + 'static>(callback: F) -> bool {
result
}
fn spawn_ffi(callback: *const u8, param: *const u8) -> bool {
type Callback = extern "C" fn(crate::ffi::void_ptr);
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
fn spawn_ffi(callback: &cxx::SharedPtr<ffi::CppCallback>) -> bool {
let callback = callback.clone();
spawn(move || {
callback(param);
callback.invoke();
})
}
@ -645,38 +648,29 @@ impl Debounce {
self.perform_inner(h)
}
fn perform_with_completion_ffi(
&self,
callback: *const u8,
param1: *const u8,
completion_callback: *const u8,
param2: *const u8,
) -> u64 {
type Callback = extern "C" fn(crate::ffi::void_ptr) -> crate::ffi::void_ptr;
type CompletionCallback = extern "C" fn(crate::ffi::void_ptr, crate::ffi::void_ptr);
fn perform_ffi(&self, callback: &cxx::SharedPtr<ffi::CppCallback>) -> u64 {
let callback = callback.clone();
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param1 = param1.into();
let completion_callback: CompletionCallback =
unsafe { std::mem::transmute(completion_callback) };
let param2 = param2.into();
self.perform_with_completion(
move || callback(param1),
move |result| completion_callback(param2, result),
)
self.perform(move || {
callback.invoke();
})
.into()
}
fn perform_ffi(&self, callback: *const u8, param: *const u8) -> u64 {
type Callback = extern "C" fn(crate::ffi::void_ptr);
fn perform_with_completion_ffi(
&self,
callback: &cxx::SharedPtr<ffi::CppCallback>,
completion: &cxx::SharedPtr<ffi::CppCallback>,
) -> u64 {
let callback = callback.clone();
let completion = completion.clone();
let callback: Callback = unsafe { std::mem::transmute(callback) };
let param = param.into();
self.perform(move || {
callback(param);
})
self.perform_with_completion(
move || -> crate::ffi::void_ptr { callback.invoke().into() },
move |result| {
completion.invoke_with_param(result.into());
},
)
.into()
}

49
src/callback.h Normal file
View file

@ -0,0 +1,49 @@
#pragma once
#include <cstdlib>
#include <functional>
#include <memory>
#include <utility>
#include <vector>
/// A RAII callback container that can be used when the rust code needs to (or might need to) free
/// up the resources allocated for a callback (either the type-erased std::function wrapping the
/// lambda itself or the parameter to it.)
struct callback_t {
std::function<void *(const void *param)> callback;
std::vector<std::function<void()>> cleanups;
/// The default no-op constructor for the callback_t type.
callback_t() {
this->callback = [=](const void *) { return (void *)nullptr; };
}
/// Creates a new callback_t instance wrapping the specified type-erased std::function with an
/// optional parameter (defaulting to nullptr).
callback_t(std::function<void *(const void *param)> &&callback) {
this->callback = std::move(callback);
}
/// Executes the wrapped callback with the parameter stored at the time of creation and returns
/// the type-erased (void *) result, but cast to a `const uint8_t *` to please cxx::bridge.
const uint8_t *invoke() const {
const void *result = callback(nullptr);
return (const uint8_t *)result;
}
/// Executes the wrapped callback with the provided parameter and returns the type-erased
/// (void *) result, but cast to a `const uint8_t *` to please cxx::bridge.
const uint8_t *invoke_with_param(const uint8_t *param) const {
const void *result = callback((const void *)param);
return (const uint8_t *)result;
}
~callback_t() {
if (cleanups.size() > 0) {
for (const std::function<void()> &dtor : cleanups) {
(dtor)();
}
cleanups.clear();
}
}
};

View file

@ -1,17 +0,0 @@
#include "config.h" // IWYU pragma: keep
#include "iothread.h"
extern "C" const void *iothread_trampoline(const void *c) {
iothread_callback_t *callback = (iothread_callback_t *)c;
auto *result = (callback->callback)(callback->param);
delete callback;
return result;
}
extern "C" const void *iothread_trampoline2(const void *c, const void *p) {
iothread_callback_t *callback = (iothread_callback_t *)c;
auto *result = (callback->callback)(p);
delete callback;
return result;
}

View file

@ -1,123 +1,84 @@
// Handles IO that may hang.
#ifndef FISH_IOTHREAD_H
#define FISH_IOTHREAD_H
#if INCLUDE_RUST_HEADERS
#pragma once
#include <cstdlib>
#include <functional>
#include <memory>
#include <utility>
#include <cassert>
#include "callback.h"
#include "threads.rs.h"
struct iothread_callback_t {
std::function<void *(const void *param)> callback;
void *param;
~iothread_callback_t() {
if (param) {
free(param);
param = nullptr;
}
}
};
extern "C" const void *iothread_trampoline(const void *callback);
extern "C" const void *iothread_trampoline2(const void *callback, const void *param);
// iothread_perform invokes a handler on a background thread.
inline void iothread_perform(std::function<void()> &&func) {
auto callback = new iothread_callback_t{std::bind([=] {
func();
return nullptr;
}),
nullptr};
std::shared_ptr<callback_t> callback = std::make_shared<callback_t>([=](const void *) {
func();
return nullptr;
});
iothread_perform((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
iothread_perform(callback);
}
/// Variant of iothread_perform that disrespects the thread limit.
/// It does its best to spawn a new thread if all other threads are occupied.
/// This is for cases where deferring a new thread might lead to deadlock.
inline void iothread_perform_cantwait(std::function<void()> &&func) {
auto callback = new iothread_callback_t{std::bind([=] {
func();
return nullptr;
}),
nullptr};
std::shared_ptr<callback_t> callback = std::make_shared<callback_t>([=](const void *) {
func();
return nullptr;
});
iothread_perform_cantwait((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
iothread_perform_cantwait(callback);
}
inline uint64_t debounce_perform(const debounce_t &debouncer, const std::function<void()> &func) {
auto callback = new iothread_callback_t{std::bind([=] {
func();
return nullptr;
}),
nullptr};
std::shared_ptr<callback_t> callback = std::make_shared<callback_t>([=](const void *) {
func();
return nullptr;
});
return debouncer.perform((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
return debouncer.perform(callback);
}
template <typename R>
inline void debounce_perform_with_completion(const debounce_t &debouncer, std::function<R()> &&func,
std::function<void(R)> &&completion) {
auto callback1 = new iothread_callback_t{[=](const void *) {
auto *result = new R(func());
return (void *)result;
},
nullptr};
std::shared_ptr<callback_t> callback2 = std::make_shared<callback_t>([=](const void *r) {
assert(r != nullptr && "callback1 result was null!");
const R *result = (const R *)r;
completion(*result);
return nullptr;
});
auto callback2 = new iothread_callback_t{
([=](const void *r) {
const R *result = (const R *)r;
completion(*result);
delete result;
return nullptr;
}),
nullptr,
};
std::shared_ptr<callback_t> callback1 = std::make_shared<callback_t>([=](const void *) {
const R *result = new R(func());
callback2->cleanups.push_back([result]() { delete result; });
return (void *)result;
});
debouncer.perform_with_completion(
(const uint8_t *)&iothread_trampoline, (const uint8_t *)callback1,
(const uint8_t *)&iothread_trampoline2, (const uint8_t *)callback2);
debouncer.perform_with_completion(callback1, callback2);
}
template <typename R>
inline void debounce_perform_with_completion(const debounce_t &debouncer, std::function<R()> &&func,
std::function<void(const R &)> &&completion) {
auto callback1 = new iothread_callback_t{[=](const void *) {
auto *result = new R(func());
return (void *)result;
},
nullptr};
std::shared_ptr<callback_t> callback2 = std::make_shared<callback_t>([=](const void *r) {
assert(r != nullptr && "callback1 result was null!");
const R *result = (const R *)r;
completion(*result);
return nullptr;
});
auto callback2 = new iothread_callback_t{
([=](const void *r) {
const R *result = (const R *)r;
completion(*result);
delete result;
return nullptr;
}),
nullptr,
};
std::shared_ptr<callback_t> callback1 = std::make_shared<callback_t>([=](const void *) {
const R *result = new R(func());
callback2->cleanups.push_back([result]() { delete result; });
return (void *)result;
});
debouncer.perform_with_completion(
(const uint8_t *)&iothread_trampoline, (const uint8_t *)callback1,
(const uint8_t *)&iothread_trampoline2, (const uint8_t *)callback2);
debouncer.perform_with_completion(callback1, callback2);
}
inline bool make_detached_pthread(const std::function<void()> &func) {
auto callback = new iothread_callback_t{
[=](const void *) {
func();
return nullptr;
},
nullptr,
};
std::shared_ptr<callback_t> callback = std::make_shared<callback_t>([=](const void *) {
func();
return nullptr;
});
return make_detached_pthread((const uint8_t *)&iothread_trampoline, (const uint8_t *)callback);
return make_detached_pthread(callback);
}
#endif
#endif