Move universal variable callbacks out from under the lock, to avoid a

possible deadlock on reentrancy
This commit is contained in:
ridiculousfish 2014-04-29 11:28:00 -07:00
parent ffc23046a2
commit a949f0b0c3
3 changed files with 61 additions and 60 deletions

View file

@ -120,10 +120,34 @@ static env_universal_t &default_universal_vars()
/** /**
Callback function, should be called on all events Callback function, should be called on all events
*/ */
struct callback_data_t
{
fish_message_type_t type;
wcstring key;
wcstring val;
callback_data_t(fish_message_type_t t, const wcstring &k, const wcstring &v) : type(t), key(k), val(v)
{
}
};
static void (*callback)(fish_message_type_t type, static void (*callback)(fish_message_type_t type,
const wchar_t *key, const wchar_t *key,
const wchar_t *val); const wchar_t *val);
/* Post callbacks that we have determined in this list. We do this here, instead of at the point where we determined that the values changed, because we determine those under a lock, and reentrancy would cause a deadlock */
static void post_callbacks(const callback_data_list_t &callbacks)
{
if (callback != NULL)
{
for (size_t i=0; i < callbacks.size(); i++)
{
const callback_data_t &data = callbacks.at(i);
callback(data.type, data.key.c_str(), data.val.c_str());
}
}
}
/* UTF <-> wchar conversions. These return a string allocated with malloc. These call sites could be cleaned up substantially to eliminate the dependence on malloc. */ /* UTF <-> wchar conversions. These return a string allocated with malloc. These call sites could be cleaned up substantially to eliminate the dependence on malloc. */
static wchar_t *utf2wcs(const char *input) static wchar_t *utf2wcs(const char *input)
{ {
@ -154,7 +178,9 @@ void env_universal_common_init(void (*cb)(fish_message_type_t type, const wchar_
void read_message(connection_t *conn) void read_message(connection_t *conn)
{ {
return default_universal_vars().read_message(conn); callback_data_list_t callbacks;
default_universal_vars().read_message(conn, &callbacks);
post_callbacks(callbacks);
} }
/** /**
@ -237,7 +263,9 @@ void env_universal_common_set(const wchar_t *key, const wchar_t *val, bool expor
void env_universal_common_sync() void env_universal_common_sync()
{ {
default_universal_vars().sync(); callback_data_list_t callbacks;
default_universal_vars().sync(&callbacks);
post_callbacks(callbacks);
} }
/** /**
@ -619,7 +647,7 @@ void env_universal_t::enqueue_all(connection_t *c) const
enqueue_all_internal(c); enqueue_all_internal(c);
} }
void env_universal_t::load_from_fd(int fd) void env_universal_t::load_from_fd(int fd, callback_data_list_t *callbacks)
{ {
ASSERT_IS_LOCKED(lock); ASSERT_IS_LOCKED(lock);
assert(fd >= 0); assert(fd >= 0);
@ -629,12 +657,12 @@ void env_universal_t::load_from_fd(int fd)
{ {
connection_t c(fd); connection_t c(fd);
/* Read from the file. Do not destroy the connection; the caller is responsible for closing the fd. */ /* Read from the file. Do not destroy the connection; the caller is responsible for closing the fd. */
this->read_message_internal(&c); this->read_message_internal(&c, callbacks);
last_read_file = current_file; last_read_file = current_file;
} }
} }
bool env_universal_t::load_from_path(const wcstring &path) bool env_universal_t::load_from_path(const wcstring &path, callback_data_list_t *callbacks)
{ {
ASSERT_IS_LOCKED(lock); ASSERT_IS_LOCKED(lock);
/* OK to not use CLO_EXEC here because fishd is single threaded */ /* OK to not use CLO_EXEC here because fishd is single threaded */
@ -642,7 +670,7 @@ bool env_universal_t::load_from_path(const wcstring &path)
int fd = wopen_cloexec(path, O_RDONLY); int fd = wopen_cloexec(path, O_RDONLY);
if (fd >= 0) if (fd >= 0)
{ {
this->load_from_fd(fd); this->load_from_fd(fd, callbacks);
close(fd); close(fd);
result = true; result = true;
} }
@ -732,8 +760,9 @@ static wcstring fishd_get_config()
bool env_universal_t::load() bool env_universal_t::load()
{ {
scoped_lock locker(lock); scoped_lock locker(lock);
callback_data_list_t callbacks;
const wcstring vars_path = explicit_vars_path.empty() ? default_vars_path() : explicit_vars_path; const wcstring vars_path = explicit_vars_path.empty() ? default_vars_path() : explicit_vars_path;
bool success = load_from_path(vars_path); bool success = load_from_path(vars_path, &callbacks);
if (! success && ! tried_renaming && errno == ENOENT) if (! success && ! tried_renaming && errno == ENOENT)
{ {
/* We failed to load, because the file was not found. Older fish used the hostname only. Try *moving* the filename based on the hostname into place; if that succeeds try again. Silently "upgraded." */ /* We failed to load, because the file was not found. Older fish used the hostname only. Try *moving* the filename based on the hostname into place; if that succeeds try again. Silently "upgraded." */
@ -865,7 +894,7 @@ bool env_universal_t::open_and_acquire_lock(const wcstring &path, int *out_fd)
return result_fd >= 0; return result_fd >= 0;
} }
bool env_universal_t::sync() bool env_universal_t::sync(callback_data_list_t *callbacks)
{ {
scoped_lock locker(lock); scoped_lock locker(lock);
/* Our saving strategy: /* Our saving strategy:
@ -894,7 +923,7 @@ bool env_universal_t::sync()
/* If we have no changes, just load */ /* If we have no changes, just load */
if (modified.empty()) if (modified.empty())
{ {
return this->load_from_path(vars_path); return this->load_from_path(vars_path, callbacks);
} }
const wcstring directory = wdirname(vars_path); const wcstring directory = wdirname(vars_path);
@ -912,7 +941,7 @@ bool env_universal_t::sync()
/* Read from it */ /* Read from it */
assert(vars_fd >= 0); assert(vars_fd >= 0);
this->load_from_fd(vars_fd); this->load_from_fd(vars_fd, callbacks);
/* Open adjacent temporary file */ /* Open adjacent temporary file */
if (! this->open_temporary_file(directory, &private_file_path, &private_fd)) if (! this->open_temporary_file(directory, &private_file_path, &private_fd))
@ -959,7 +988,7 @@ bool env_universal_t::sync()
return success; return success;
} }
void env_universal_t::read_message_internal(connection_t *src) void env_universal_t::read_message_internal(connection_t *src, callback_data_list_t *callbacks)
{ {
ASSERT_IS_LOCKED(lock); ASSERT_IS_LOCKED(lock);
while (1) while (1)
@ -1020,7 +1049,7 @@ void env_universal_t::read_message_internal(connection_t *src)
if (msg) if (msg)
{ {
this->parse_message_internal(msg, src); this->parse_message_internal(msg, src, callbacks);
} }
else else
{ {
@ -1037,16 +1066,16 @@ void env_universal_t::read_message_internal(connection_t *src)
} }
} }
void env_universal_t::read_message(connection_t *src) void env_universal_t::read_message(connection_t *src, callback_data_list_t *callbacks)
{ {
scoped_lock locker(lock); scoped_lock locker(lock);
return read_message_internal(src); return read_message_internal(src, callbacks);
} }
/** /**
Parse message msg Parse message msg
*/ */
void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src) void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src, callback_data_list_t *callbacks)
{ {
ASSERT_IS_LOCKED(lock); ASSERT_IS_LOCKED(lock);
@ -1073,6 +1102,10 @@ void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src)
if (unescape_string(tmp + 1, &val, 0)) if (unescape_string(tmp + 1, &val, 0))
{ {
this->set_internal(key, val, exportv, false); this->set_internal(key, val, exportv, false);
if (callbacks != NULL)
{
callbacks->push_back(callback_data_t(exportv ? SET_EXPORT:SET, key, val));
}
} }
} }
else else
@ -1100,11 +1133,9 @@ void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src)
} }
this->remove_internal(name, false); this->remove_internal(name, false);
if (callbacks != NULL)
#warning We're locked when this is invoked - bad news!
if (callback)
{ {
callback(ERASE, name, 0); callbacks->push_back(callback_data_t(ERASE, name, wcstring()));
} }
} }
else if (match(msg, BARRIER_STR)) else if (match(msg, BARRIER_STR))
@ -1116,9 +1147,9 @@ void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src)
} }
else if (match(msg, BARRIER_REPLY_STR)) else if (match(msg, BARRIER_REPLY_STR))
{ {
if (callback) if (callbacks != NULL)
{ {
callback(BARRIER_REPLY, 0, 0); callbacks->push_back(callback_data_t(BARRIER_REPLY, wcstring(), wcstring()));
} }
} }
else else
@ -1127,8 +1158,6 @@ void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src)
} }
} }
static std::string get_variables_file_path(const std::string &dir, const std::string &identifier) static std::string get_variables_file_path(const std::string &dir, const std::string &identifier)
{ {
std::string name; std::string name;
@ -1139,36 +1168,6 @@ static std::string get_variables_file_path(const std::string &dir, const std::st
return name; return name;
} }
static bool load_or_save_variables_at_path(bool save, const std::string &path)
{
bool result = false;
/* OK to not use CLO_EXEC here because fishd is single threaded */
int fd = open(path.c_str(), save?(O_CREAT | O_TRUNC | O_WRONLY):O_RDONLY, 0600);
if (fd >= 0)
{
/* Success */
result = true;
connection_t c(fd);
if (save)
{
/* Save to the file */
write_loop(c.fd, SAVE_MSG, strlen(SAVE_MSG));
enqueue_all(&c);
}
else
{
/* Read from the file */
read_message(&c);
}
connection_destroy(&c);
}
return result;
}
/** /**
Maximum length of hostname. Longer hostnames are truncated Maximum length of hostname. Longer hostnames are truncated
*/ */

View file

@ -204,6 +204,8 @@ void enqueue_all(connection_t *c);
*/ */
void connection_destroy(connection_t *c); void connection_destroy(connection_t *c);
typedef std::vector<struct callback_data_t> callback_data_list_t;
/** Class representing universal variables */ /** Class representing universal variables */
class env_universal_t class env_universal_t
{ {
@ -218,10 +220,10 @@ class env_universal_t
mutable pthread_mutex_t lock; mutable pthread_mutex_t lock;
bool tried_renaming; bool tried_renaming;
bool load_from_path(const wcstring &path); bool load_from_path(const wcstring &path, callback_data_list_t *callbacks);
void load_from_fd(int fd); void load_from_fd(int fd, callback_data_list_t *callbacks);
void parse_message_internal(wchar_t *msg, connection_t *src); void parse_message_internal(wchar_t *msg, connection_t *src, callback_data_list_t *callbacks);
void set_internal(const wcstring &key, const wcstring &val, bool exportv, bool overwrite); void set_internal(const wcstring &key, const wcstring &val, bool exportv, bool overwrite);
void remove_internal(const wcstring &name, bool overwrite); void remove_internal(const wcstring &name, bool overwrite);
@ -235,7 +237,7 @@ class env_universal_t
/* File id from which we last read */ /* File id from which we last read */
file_id_t last_read_file; file_id_t last_read_file;
void read_message_internal(connection_t *src); void read_message_internal(connection_t *src, callback_data_list_t *callbacks);
void enqueue_all_internal(connection_t *c) const; void enqueue_all_internal(connection_t *c) const;
public: public:
@ -264,10 +266,10 @@ public:
bool load(); bool load();
/** Reads and writes variables at the correct path */ /** Reads and writes variables at the correct path */
bool sync(); bool sync(callback_data_list_t *callbacks);
/* Internal use */ /* Internal use */
void read_message(connection_t *src); void read_message(connection_t *src, callback_data_list_t *callbacks);
}; };
std::string get_machine_identifier(); std::string get_machine_identifier();

View file

@ -2170,7 +2170,7 @@ static int test_universal_helper(int *x)
const wcstring key = format_string(L"key_%d_%d", *x, j); const wcstring key = format_string(L"key_%d_%d", *x, j);
const wcstring val = format_string(L"val_%d_%d", *x, j); const wcstring val = format_string(L"val_%d_%d", *x, j);
uvars.set(key, val, false); uvars.set(key, val, false);
bool synced = uvars.sync(); bool synced = uvars.sync(NULL);
if (! synced) if (! synced)
{ {
err(L"Failed to sync universal variables"); err(L"Failed to sync universal variables");