Removed connection_t and associated functions

This commit is contained in:
ridiculousfish 2014-06-09 16:41:10 -07:00
parent 11c1562512
commit 17c2d76c5e
3 changed files with 41 additions and 456 deletions

View file

@ -10,11 +10,6 @@
#include "env_universal_common.h"
#include "env.h"
/**
Data about the universal variable server.
*/
extern connection_t env_universal_server;
/**
Initialize the envuni library
*/

View file

@ -47,21 +47,6 @@
*/
#define PARSE_ERR L"Unable to parse universal variable message: '%ls'"
/**
ERROR string for internal buffered reader
*/
#define ENV_UNIVERSAL_ERROR 0x100
/**
EAGAIN string for internal buffered reader
*/
#define ENV_UNIVERSAL_AGAIN 0x101
/**
EOF string for internal buffered reader
*/
#define ENV_UNIVERSAL_EOF 0x102
/** Small note about not editing ~/.fishd manually. Inserted at the top of all .fishd files. */
#define SAVE_MSG "# This file is automatically generated by the fish.\n# Do NOT edit it directly, your changes will be overwritten.\n"
@ -120,83 +105,11 @@ static void post_callbacks(const callback_data_list_t &callbacks)
}
}
/* UTF <-> wchar conversions. These return a string allocated with malloc. These call sites could be cleaned up substantially to eliminate the dependence on malloc. */
static wchar_t *utf2wcs(const char *input)
{
wchar_t *result = NULL;
wcstring converted;
if (utf8_to_wchar_string(input, &converted))
{
result = wcsdup(converted.c_str());
}
return result;
}
static char *wcs2utf(const wchar_t *input)
{
char *result = NULL;
std::string converted;
if (wchar_to_utf8_string(input, &converted))
{
result = strdup(converted.c_str());
}
return result;
}
void env_universal_common_init(void (*cb)(fish_message_type_t type, const wchar_t *key, const wchar_t *val))
{
callback = cb;
}
void read_message(connection_t *conn)
{
callback_data_list_t callbacks;
default_universal_vars().read_message(conn, &callbacks);
post_callbacks(callbacks);
}
/**
Read one byte of date form the specified connection
*/
static int read_byte(connection_t *src)
{
if (src->buffer_consumed >= src->read_buffer.size())
{
char local[ENV_UNIVERSAL_BUFFER_SIZE];
ssize_t res = read(src->fd, local, sizeof local);
// debug(4, L"Read chunk '%.*s'", res, src->buffer );
if (res < 0)
{
if (errno == EAGAIN ||
errno == EINTR)
{
return ENV_UNIVERSAL_AGAIN;
}
return ENV_UNIVERSAL_ERROR;
}
else if (res == 0)
{
return ENV_UNIVERSAL_EOF;
}
else
{
src->read_buffer.clear();
src->read_buffer.insert(src->read_buffer.begin(), local, local + res);
src->buffer_consumed = 0;
}
}
return src->read_buffer.at(src->buffer_consumed++);
}
/**
Remove variable with specified name
*/
@ -259,80 +172,6 @@ static void report_error(int err_code, const wchar_t *err_format, ...)
fwprintf(stderr, L"%s\n", strerror(err_code));
}
/**
Attempt to send the specified message to the specified file descriptor
\return 1 on sucess, 0 if the message could not be sent without blocking and -1 on error
*/
static int try_send(message_t *msg,
int fd)
{
debug(3,
L"before write of %d chars to fd %d", msg->body.size(), fd);
ssize_t res = write(fd, msg->body.c_str(), msg->body.size());
if (res != -1)
{
debug(4, L"Wrote message '%s'", msg->body.c_str());
}
else
{
debug(4, L"Failed to write message '%s'", msg->body.c_str());
}
if (res == -1)
{
switch (errno)
{
case EAGAIN:
return 0;
default:
debug(2,
L"Error while sending universal variable message to fd %d. Closing connection",
fd);
if (debug_level > 2)
wperror(L"write");
return -1;
}
}
msg->count--;
if (!msg->count)
{
delete msg;
}
return 1;
}
void try_send_all(connection_t *c)
{
/* debug( 3,
L"Send all updates to connection on fd %d",
c->fd );*/
while (!c->unsent.empty())
{
switch (try_send(c->unsent.front(), c->fd))
{
case 1:
c->unsent.pop();
break;
case 0:
debug(4,
L"Socket full, send rest later");
return;
case -1:
c->killme = 1;
return;
}
}
}
/* The universal variable format has some funny escaping requirements; here we try to be safe */
static bool is_universal_safe_to_encode_directly(wchar_t c)
{
@ -372,28 +211,6 @@ static wcstring full_escape(const wchar_t *in)
return out;
}
/* Sets the body of a message to the null-terminated list of null terminated const char *. */
void set_body(message_t *msg, ...)
{
/* Start by counting the length of all the strings */
size_t body_len = 0;
const char *arg;
va_list arg_list;
va_start(arg_list, msg);
while ((arg = va_arg(arg_list, const char *)) != NULL)
body_len += strlen(arg);
va_end(arg_list);
/* Reserve that length in the string */
msg->body.reserve(body_len + 1); //+1 for trailing NULL? Do I need that?
/* Set the string contents */
va_start(arg_list, msg);
while ((arg = va_arg(arg_list, const char *)) != NULL)
msg->body.append(arg);
va_end(arg_list);
}
/* Converts input to UTF-8 and appends it to receiver, using storage as temp storage */
static bool append_utf8(const wcstring &input, std::string *receiver, std::string *storage)
{
@ -460,66 +277,6 @@ static bool append_file_entry(fish_message_type_t type, const wcstring &key_in,
return success;
}
/* Returns an instance of message_t allocated via new */
message_t *create_message(fish_message_type_t type,
const wchar_t *key_in,
const wchar_t *val_in)
{
char *key = NULL;
// debug( 4, L"Crete message of type %d", type );
if (key_in)
{
if (wcsvarname(key_in))
{
debug(0, L"Illegal variable name: '%ls'", key_in);
return NULL;
}
key = wcs2utf(key_in);
if (!key)
{
debug(0,
L"Could not convert %ls to narrow character string",
key_in);
return NULL;
}
}
message_t *msg = new message_t;
msg->count = 0;
switch (type)
{
case SET:
case SET_EXPORT:
{
if (!val_in)
{
val_in=L"";
}
wcstring esc = full_escape(val_in);
char *val = wcs2utf(esc.c_str());
set_body(msg, (type==SET?SET_MBS:SET_EXPORT_MBS), " ", key, ":", val, "\n", NULL);
free(val);
break;
}
default:
{
debug(0, L"create_message: Unknown message type");
}
}
free(key);
// debug( 4, L"Message body is '%s'", msg->body );
return msg;
}
/**
Put exported or unexported variables in a string list
*/
@ -540,19 +297,6 @@ bool env_universal_common_get_export(const wcstring &name)
return default_universal_vars().get_export(name);
}
void enqueue_all(connection_t *c)
{
default_universal_vars().enqueue_all(c);
}
connection_t::connection_t(int input_fd) :
fd(input_fd),
killme(false),
buffer_consumed(0)
{
}
env_universal_t::env_universal_t(const wcstring &path) : explicit_vars_path(path), tried_renaming(false), last_read_file(kInvalidFileID)
{
VOMIT_ON_FAILURE(pthread_mutex_init(&lock, NULL));
@ -653,28 +397,6 @@ wcstring_list_t env_universal_t::get_names(bool show_exported, bool show_unexpor
return result;
}
void env_universal_t::enqueue_all_internal(connection_t *c) const
{
ASSERT_IS_LOCKED(lock);
var_table_t::const_iterator iter;
for (iter = vars.begin(); iter != vars.end(); ++iter)
{
const wcstring &key = iter->first;
const var_entry_t &entry = iter->second;
message_t *msg = create_message(entry.exportv ? SET_EXPORT : SET, key.c_str(), entry.val.c_str());
msg->count=1;
c->unsent.push(msg);
fprintf(stderr, "%s", msg->body.c_str());
}
try_send_all(c);
}
void env_universal_t::enqueue_all(connection_t *c) const
{
scoped_lock locker(lock);
enqueue_all_internal(c);
}
void env_universal_t::erase_unmodified_values()
{
@ -710,9 +432,8 @@ void env_universal_t::load_from_fd(int fd, callback_data_list_t *callbacks)
{
/* Unmodified values are sourced from the file. Since we are about to read a different file, erase them */
this->erase_unmodified_values();
connection_t c(fd);
/* Read from the file. Do not destroy the connection; the caller is responsible for closing the fd. */
this->read_message_internal(&c, callbacks);
/* Read from the file. */
this->read_message_internal(fd, callbacks);
last_read_file = current_file;
}
}
@ -1109,94 +830,61 @@ bool env_universal_t::sync(callback_data_list_t *callbacks)
return success;
}
void env_universal_t::read_message_internal(connection_t *src, callback_data_list_t *callbacks)
void env_universal_t::read_message_internal(int fd, callback_data_list_t *callbacks)
{
ASSERT_IS_LOCKED(lock);
while (1)
// The line we construct (and then parse)
std::string line;
wcstring wide_line;
for (;;)
{
int ib = read_byte(src);
char b;
switch (ib)
// Read into a buffer. Note this is NOT null-terminated!
char buffer[1024];
ssize_t amt = read_loop(fd, buffer, sizeof buffer);
if (amt <= 0)
{
case ENV_UNIVERSAL_AGAIN:
break;
}
// Walk over it by lines. The contents of an unterminated line will be left in 'line' for the next iteration.
size_t line_start = 0;
while (line_start < sizeof buffer)
{
// Run until we hit a newline
size_t cursor = line_start;
while (cursor < sizeof buffer && buffer[cursor] != '\n')
{
return;
cursor++;
}
case ENV_UNIVERSAL_ERROR:
// Copy over what we read
line.append(buffer + line_start, cursor - line_start);
// Process it if it's a newline (which is true if we are before the end of the buffer)
if (cursor < sizeof buffer && ! line.empty())
{
debug(2, L"Read error on fd %d, set killme flag", src->fd);
if (debug_level > 2)
wperror(L"read");
src->killme = 1;
return;
}
case ENV_UNIVERSAL_EOF:
{
src->killme = 1;
debug(3, L"Fd %d has reached eof, set killme flag", src->fd);
if (! src->input.empty())
if (utf8_to_wchar_string(line, &wide_line))
{
char c = 0;
src->input.push_back(c);
debug(1,
L"Universal variable connection closed while reading command. Partial command recieved: '%s'",
&src->input.at(0));
wchar_t *tmp = wcsdup(wide_line.c_str());
this->parse_message_internal(tmp, callbacks);
free(tmp);
}
return;
}
}
b = (char)ib;
if (b == '\n')
{
wchar_t *msg;
b = 0;
src->input.push_back(b);
msg = utf2wcs(&src->input.at(0));
/*
Before calling parse_message, we must empty reset
everything, since the callback function could
potentially call read_message.
*/
src->input.clear();
if (msg)
{
this->parse_message_internal(msg, src, callbacks);
}
else
{
debug(0, _(L"Could not convert message '%s' to wide character string"), &src->input.at(0));
line.clear();
}
free(msg);
}
else
{
src->input.push_back(b);
// Skip over the newline (or skip past the end)
line_start = cursor + 1;
}
}
}
void env_universal_t::read_message(connection_t *src, callback_data_list_t *callbacks)
{
scoped_lock locker(lock);
return read_message_internal(src, callbacks);
// We make no effort to handle an unterminated last line
}
/**
Parse message msg
*/
void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src, callback_data_list_t *callbacks)
void env_universal_t::parse_message_internal(wchar_t *msg, callback_data_list_t *callbacks)
{
ASSERT_IS_LOCKED(lock);

View file

@ -33,85 +33,6 @@ typedef enum
*/
#define ENV_UNIVERSAL_BUFFER_SIZE 1024
/**
A struct representing a message to be sent between client and server
*/
typedef struct
{
/**
Number of queues that contain this message. Once this reaches zero, the message should be deleted
*/
int count;
/**
Message body.
*/
std::string body;
} message_t;
typedef std::queue<message_t *> message_queue_t;
/**
This struct represents a connection between a universal variable server/client
*/
class connection_t
{
private:
/* No assignment */
connection_t &operator=(const connection_t &);
public:
/**
The file descriptor this socket lives on
*/
int fd;
/**
Queue of unsent messages
*/
std::queue<message_t *> unsent;
/**
Set to one when this connection should be killed
*/
bool killme;
/**
The input string. Input from the socket goes here. When a
newline is encountered, the buffer is parsed and cleared.
*/
std::vector<char> input;
/**
The read buffer.
*/
std::vector<char> read_buffer;
/**
Number of bytes that have already been consumed.
*/
size_t buffer_consumed;
/* Constructor */
connection_t(int input_fd);
};
/**
Read all available messages on this connection
*/
void read_message(connection_t *);
/**
Send as many messages as possible without blocking to the connection
*/
void try_send_all(connection_t *c);
/**
Create a messge with the specified properties
*/
message_t *create_message(fish_message_type_t type, const wchar_t *key, const wchar_t *val);
/**
Init the library
*/
@ -169,18 +90,6 @@ bool env_universal_common_get_export(const wcstring &name);
/** Synchronizes all changse: writes everything out, reads stuff in */
void env_universal_common_sync();
/**
Add messages about all existing variables to the specified connection
*/
void enqueue_all(connection_t *c);
/**
Close and destroy the specified connection struct. This frees
allstructures allocated by the connection, such as ques of unsent
messages.
*/
void connection_destroy(connection_t *c);
typedef std::vector<struct callback_data_t> callback_data_list_t;
/** Class representing universal variables */
@ -201,7 +110,7 @@ class env_universal_t
void load_from_fd(int fd, callback_data_list_t *callbacks);
void erase_unmodified_values();
void parse_message_internal(wchar_t *msg, connection_t *src, callback_data_list_t *callbacks);
void parse_message_internal(wchar_t *msg, callback_data_list_t *callbacks);
void set_internal(const wcstring &key, const wcstring &val, bool exportv, bool overwrite);
void remove_internal(const wcstring &name, bool overwrite);
@ -215,8 +124,7 @@ class env_universal_t
/* File id from which we last read */
file_id_t last_read_file;
void read_message_internal(connection_t *src, callback_data_list_t *callbacks);
void enqueue_all_internal(connection_t *c) const;
void read_message_internal(int fd, callback_data_list_t *callbacks);
public:
env_universal_t(const wcstring &path);
@ -237,17 +145,11 @@ public:
/* Gets variable names */
wcstring_list_t get_names(bool show_exported, bool show_unexported) const;
/* Writes variables to the connection */
void enqueue_all(connection_t *c) const;
/** Loads variables at the correct path */
bool load();
/** Reads and writes variables at the correct path. Returns true if modified variables were written. */
bool sync(callback_data_list_t *callbacks);
/* Internal use */
void read_message(connection_t *src, callback_data_list_t *callbacks);
};
/** The "universal notifier" is an object responsible for broadcasting and receiving universal variable change notifications. These notifications do not contain the change, but merely indicate that the uvar file has changed. It is up to the uvar subsystem to re-read the file.