From fdfa5c06028d3473f57cea497a6682a8e550352f Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 27 Feb 2012 18:43:24 -0800 Subject: [PATCH] Some initial work towards resolving nasty fork/pthread issues, and to having a per-parser job list --- builtin.cpp | 2 +- builtin_jobs.cpp | 8 +-- common.cpp | 47 ++++++++++++++++- common.h | 41 +++++---------- exec.cpp | 4 ++ expand.cpp | 2 +- fish.cpp | 4 +- iothread.cpp | 4 ++ parser.cpp | 67 +++++++++++++++++++++++- parser.h | 53 ++++++++++--------- proc.cpp | 132 ++++++++++++++++++++++++++--------------------- proc.h | 29 ++++++----- reader.cpp | 2 +- 13 files changed, 258 insertions(+), 137 deletions(-) diff --git a/builtin.cpp b/builtin.cpp index f9eee831f..bdbf270ff 100644 --- a/builtin.cpp +++ b/builtin.cpp @@ -1133,7 +1133,7 @@ static void functions_def( const wcstring &name, wcstring &out ) case EVENT_JOB_ID: { - job_t *j = job_get( next->param1.job_id ); + const job_t *j = job_get( next->param1.job_id ); if( j ) append_format( out, L" --on-job-exit %d", j->pgid ); break; diff --git a/builtin_jobs.cpp b/builtin_jobs.cpp index 1aecf51db..86e4b0768 100644 --- a/builtin_jobs.cpp +++ b/builtin_jobs.cpp @@ -71,7 +71,7 @@ static int cpu_use( job_t *j ) /** Print information about the specified job */ -static void builtin_jobs_print( job_t *j, int mode, int header ) +static void builtin_jobs_print( const job_t *j, int mode, int header ) { process_t *p; switch( mode ) @@ -164,7 +164,7 @@ static int builtin_jobs( parser_t &parser, wchar_t **argv ) int found=0; int mode=JOBS_DEFAULT; int print_last = 0; - job_t *j; + const job_t *j; argc = builtin_count_args( argv ); woptind=0; @@ -270,7 +270,7 @@ static int builtin_jobs( parser_t &parser, wchar_t **argv ) Ignore unconstructed jobs, i.e. ourself. */ job_iterator_t jobs; - job_t *j; + const job_t *j; while ((j = jobs.next())) { @@ -324,7 +324,7 @@ static int builtin_jobs( parser_t &parser, wchar_t **argv ) else { job_iterator_t jobs; - job_t *j; + const job_t *j; while ((j = jobs.next())) { /* diff --git a/common.cpp b/common.cpp index 217a63c0c..95de387b4 100644 --- a/common.cpp +++ b/common.cpp @@ -2058,6 +2058,22 @@ void set_main_thread() { main_thread_id = pthread_self(); } + +/* Notice when we've forked */ +static bool is_child_of_fork = false; +static void child_note_forked(void) { + is_child_of_fork = true; +} + +bool is_forked_child(void) { + return is_child_of_fork; +} + +void setup_fork_guards(void) { + /* Notice when we fork */ + pthread_atfork(NULL /* prepare */, NULL /* parent */, child_note_forked); +} + static bool is_main_thread() { assert (main_thread_id != 0); return main_thread_id == pthread_self(); @@ -2068,7 +2084,14 @@ void assert_is_main_thread(const char *who) if (! is_main_thread()) { fprintf(stderr, "Warning: %s called off of main thread. Break on debug_thread_error to debug.\n", who); debug_thread_error(); - sleep(1000); + } +} + +void assert_is_not_forked_child(const char *who) +{ + if (is_forked_child()) { + fprintf(stderr, "Warning: %s called in a forked child. Break on debug_thread_error to debug.\n", who); + debug_thread_error(); } } @@ -2089,3 +2112,25 @@ void assert_is_locked(void *vmutex, const char *who) pthread_mutex_unlock(mutex); } } + +void scoped_lock::lock(void) { + assert(! locked); + assert(! is_forked_child()); + VOMIT_ON_FAILURE(pthread_mutex_lock(lock_obj)); + locked = true; +} + +void scoped_lock::unlock(void) { + assert(locked); + assert(! is_forked_child()); + VOMIT_ON_FAILURE(pthread_mutex_unlock(lock_obj)); + locked = false; +} + +scoped_lock::scoped_lock(pthread_mutex_t &mutex) : lock_obj(&mutex), locked(false) { + this->lock(); +} + +scoped_lock::~scoped_lock() { + if (locked) this->unlock(); +} diff --git a/common.h b/common.h index 51c2e1eab..f3f4d577f 100644 --- a/common.h +++ b/common.h @@ -301,30 +301,16 @@ wcstring to_string(const T &x) { return stream.str(); } +bool is_forked_child(); + class scoped_lock { pthread_mutex_t *lock_obj; bool locked; public: - - void lock(void) { - assert(! locked); - VOMIT_ON_FAILURE(pthread_mutex_lock(lock_obj)); - locked = true; - } - - void unlock(void) { - assert(locked); - VOMIT_ON_FAILURE(pthread_mutex_unlock(lock_obj)); - locked = false; - } - - scoped_lock(pthread_mutex_t &mutex) : lock_obj(&mutex), locked(false) { - this->lock(); - } - - ~scoped_lock() { - if (locked) this->unlock(); - } + void lock(void); + void unlock(void); + scoped_lock(pthread_mutex_t &mutex); + ~scoped_lock(); }; class wcstokenizer { @@ -597,14 +583,13 @@ double timef(); */ void set_main_thread(); -/** - -*/ - -/** - - */ +/** Set up a guard to complain if we try to do certain things (like take a lock) after calling fork */ +void setup_fork_guards(void); +/** Return whether we are the child of a fork */ +bool is_forked_child(void); +void assert_is_not_forked_child(const char *who); +#define ASSERT_IS_NOT_FORKED_CHILD_TRAMPOLINE(x) assert_is_not_forked_child(x) +#define ASSERT_IS_NOT_FORKED_CHILD() ASSERT_IS_NOT_FORKED_CHILD_TRAMPOLINE(__FUNCTION__) #endif - diff --git a/exec.cpp b/exec.cpp index 3de094b99..85a3696a5 100644 --- a/exec.cpp +++ b/exec.cpp @@ -831,6 +831,8 @@ static int set_child_group( job_t *j, process_t *p, int print_errors ) */ static pid_t exec_fork() { + ASSERT_IS_MAIN_THREAD(); + pid_t pid; struct timespec pollint; int i; @@ -1047,6 +1049,7 @@ void exec( parser_t &parser, job_t *j ) if( keepalive.pid == 0 ) { + /* Child */ keepalive.pid = getpid(); set_child_group( j, &keepalive, 1 ); pause(); @@ -1054,6 +1057,7 @@ void exec( parser_t &parser, job_t *j ) } else { + /* Parent */ set_child_group( j, &keepalive, 0 ); } } diff --git a/expand.cpp b/expand.cpp index 08cee55f7..4b2011954 100644 --- a/expand.cpp +++ b/expand.cpp @@ -334,7 +334,7 @@ static int find_process( const wchar_t *proc, int sz=0; int found = 0; - job_t *j; + const job_t *j; if( iswnumeric(proc) || (wcslen(proc)==0) ) { diff --git a/fish.cpp b/fish.cpp index 29c1ccf42..67448e00a 100644 --- a/fish.cpp +++ b/fish.cpp @@ -260,7 +260,6 @@ static int fish_parse_opt( int argc, char **argv, const char **cmd_ptr ) return my_optind; } - /** Calls a bunch of init functions, parses the init files and then parses commands from stdin or files, depending on arguments @@ -274,7 +273,8 @@ int main( int argc, char **argv ) int my_optind=0; set_main_thread(); - + setup_fork_guards(); + wsetlocale( LC_ALL, L"" ); is_interactive_session=1; program_name=L"fish"; diff --git a/iothread.cpp b/iothread.cpp index ee122369f..b98dd8f24 100644 --- a/iothread.cpp +++ b/iothread.cpp @@ -1,4 +1,6 @@ +#include "config.h" #include "iothread.h" +#include "common.h" #include #include #include @@ -139,6 +141,8 @@ static void iothread_spawn_if_needed(void) { } int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int), void *context) { + ASSERT_IS_MAIN_THREAD(); + ASSERT_IS_NOT_FORKED_CHILD(); iothread_init(); /* Create and initialize a request. */ diff --git a/parser.cpp b/parser.cpp index 8c55943ca..01dbb627b 100644 --- a/parser.cpp +++ b/parser.cpp @@ -371,6 +371,7 @@ parser_t::parser_t(enum parser_type_t type) : parser_t &parser_t::principal_parser(void) { + ASSERT_IS_NOT_FORKED_CHILD(); ASSERT_IS_MAIN_THREAD(); static parser_t parser(PARSER_TYPE_GENERAL); return parser; @@ -764,6 +765,12 @@ int parser_t::eval_args( const wchar_t *line, std::vector &args ) tokenizer tok; const bool show_errors = (this->parser_type == PARSER_TYPE_GENERAL || this->parser_type == PARSER_TYPE_ERRORS_ONLY); + expand_flags_t eflags = 0; + if (! show_errors) + eflags |= EXPAND_NO_DESCRIPTIONS; + if (this->parser_type != PARSER_TYPE_GENERAL) + eflags |= EXPAND_SKIP_CMDSUBST; + /* eval_args may be called while evaulating another command, so we save the previous tokenizer and restore it on exit @@ -799,7 +806,7 @@ int parser_t::eval_args( const wchar_t *line, std::vector &args ) DIE_MEM(); } - if( expand_string( tmp, args, (show_errors ? 0 : EXPAND_NO_DESCRIPTIONS) ) == EXPAND_ERROR ) + if( expand_string( tmp, args, eflags ) == EXPAND_ERROR ) { err_pos=tok_get_pos( &tok ); do_loop=0; @@ -1210,6 +1217,62 @@ int parser_t::is_help( const wchar_t *s, int min_match ) const ( len >= min_match && (wcsncmp( L"--help", s, len ) == 0) ); } +job_t *parser_t::job_create(void) +{ + job_t *res = new job_t(acquire_job_id()); + this->my_job_list.push_front(res); + + job_set_flag( res, + JOB_CONTROL, + (job_control_mode==JOB_CONTROL_ALL) || + ((job_control_mode == JOB_CONTROL_INTERACTIVE) && (get_is_interactive())) ); + return res; +} + +bool parser_t::job_remove( job_t *j ) +{ + job_list_t::iterator iter = std::find(my_job_list.begin(), my_job_list.end(), j); + if (iter != my_job_list.end()) { + my_job_list.erase(iter); + return true; + } else { + debug( 1, _( L"Job inconsistency" ) ); + sanity_lose(); + return false; + } +} + +void parser_t::job_promote(job_t *job) +{ + job_list_t::iterator loc = find(my_job_list.begin(), my_job_list.end(), job); + assert(loc != my_job_list.end()); + + /* Move the job to the beginning */ + my_job_list.splice(my_job_list.begin(), my_job_list, loc); +} + +job_t *parser_t::job_get(job_id_t id) +{ + job_iterator_t jobs(my_job_list); + job_t *job; + while ((job = jobs.next())) { + if( id <= 0 || job->job_id == id) + return job; + } + return NULL; +} + +job_t *parser_t::job_get_from_pid( int pid ) +{ + job_iterator_t jobs; + job_t *job; + while ((job = jobs.next())) { + if( job->pgid == pid ) + return job; + } + return 0; +} + /** Parse options for the specified job @@ -2212,7 +2275,7 @@ void parser_t::eval_job( tokenizer *tok ) { case TOK_STRING: { - j = job_create(); + j = this->job_create(); job_set_flag( j, JOB_FOREGROUND, 1 ); job_set_flag( j, JOB_TERMINAL, job_get_flag( j, JOB_CONTROL ) ); job_set_flag( j, JOB_TERMINAL, job_get_flag( j, JOB_CONTROL ) \ diff --git a/parser.h b/parser.h index 9a5bad612..6d908de6e 100644 --- a/parser.h +++ b/parser.h @@ -9,7 +9,6 @@ #include "proc.h" #include "util.h" -#include "parser.h" #include "event.h" #include "function.h" #include @@ -295,6 +294,9 @@ class parser_t { /** String index where the current job started. */ int job_start_pos; + /** The jobs associated with this parser */ + job_list_t my_job_list; + /** Keeps track of how many recursive eval calls have been made. Eval doesn't call itself directly, recursion happens on blocks and on @@ -381,46 +383,47 @@ class parser_t { */ const wchar_t *current_line(); - /** - Returns the current line number - */ + /** Returns the current line number */ int get_lineno() const; - /** - Returns the current position in the latest string of the tokenizer. - */ + /** Returns the current position in the latest string of the tokenizer. */ int get_pos() const; - /** - Returns the position where the current job started in the latest string of the tokenizer. - */ + /** Returns the position where the current job started in the latest string of the tokenizer. */ int get_job_pos() const; - /** - Set the current position in the latest string of the tokenizer. - */ + /** Set the current position in the latest string of the tokenizer. */ void set_pos( int p); - /** - Get the string currently parsed - */ + /** Get the string currently parsed */ const wchar_t *get_buffer() const; + + /** Get the list of jobs */ + job_list_t &job_list() { return my_job_list; } - /** - Create block of specified type - */ + /** Create block of specified type */ void push_block( int type); - /** - Remove the outermost block namespace - */ + /** Remove the outermost block namespace */ void pop_block(); - /** - Return a description of the given blocktype - */ + /** Return a description of the given blocktype */ const wchar_t *get_block_desc( int block ) const; + /** Create a job */ + job_t *job_create(); + + /** Removes a job */ + bool job_remove(job_t *job); + + /** Promotes a job to the front of the list */ + void job_promote(job_t *job); + + /** Return the job with the specified job id. If id is 0 or less, return the last job used. */ + job_t *job_get(int job_id); + + /** Returns the job with the given pid */ + job_t *job_get_from_pid( int pid ); /** Test if the specified string can be parsed, or if more bytes need diff --git a/proc.cpp b/proc.cpp index 22c6fc601..837aae19c 100644 --- a/proc.cpp +++ b/proc.cpp @@ -95,12 +95,28 @@ static int last_status=0; */ static sig_atomic_t got_signal=0; -static std::list s_job_list; - -job_list_t &job_list(void) { - return s_job_list; +bool job_list_is_empty(void) +{ + ASSERT_IS_MAIN_THREAD(); + return parser_t::principal_parser().job_list().empty(); } +void job_iterator_t::reset() +{ + this->current = job_list->begin(); + this->end = job_list->end(); +} + +job_iterator_t::job_iterator_t(job_list_t &jobs) : job_list(&jobs) { + this->reset(); +} + + +job_iterator_t::job_iterator_t() : job_list(&parser_t::principal_parser().job_list()) { + this->reset(); +} + + int is_interactive_session=0; int is_subshell=0; int is_block=0; @@ -140,26 +156,14 @@ void proc_init() */ static int job_remove( job_t *j ) { - job_list_t &jobs = job_list(); - job_list_t::iterator iter = find(jobs.begin(), jobs.end(), j); - if (iter != jobs.end()) { - jobs.erase(iter); - return 1; - } else { - debug( 1, _( L"Job inconsistency" ) ); - sanity_lose(); - return 0; - } + ASSERT_IS_MAIN_THREAD(); + return parser_t::principal_parser().job_remove(j); } void job_promote(job_t *job) { - job_list_t &jobs = job_list(); - job_list_t::iterator loc = find(jobs.begin(), jobs.end(), job); - assert(loc != jobs.end()); - - /* Move the job to the beginning */ - jobs.splice(jobs.begin(), jobs, loc); + ASSERT_IS_MAIN_THREAD(); + parser_t::principal_parser().job_promote(job); } @@ -201,13 +205,13 @@ void process_t::set_argv(const wcstring_list_t &argv) { void proc_destroy() { event.arguments.reset(NULL); - job_list_t &jobs = job_list(); + job_list_t &jobs = parser_t::principal_parser().job_list(); while( ! jobs.empty() ) { job_t *job = jobs.front(); debug( 2, L"freeing leaked job %ls", job->command_cstr() ); job_free( job ); - } + } } void proc_set_last_status( int s ) @@ -220,47 +224,59 @@ int proc_get_last_status() return last_status; } -job_t *job_create() -{ - int free_id=1; - - while( job_get( free_id ) != 0 ) - free_id++; - - job_t *res = new job_t(free_id); - job_list().push_front(res); - - job_set_flag( res, - JOB_CONTROL, - (job_control_mode==JOB_CONTROL_ALL) || - ((job_control_mode == JOB_CONTROL_INTERACTIVE) && (is_interactive)) ); +/* Basic thread safe job IDs. The vector consumed_job_ids has a true value wherever the job ID corresponding to that slot is in use. The job ID corresponding to slot 0 is 1. */ +static pthread_mutex_t job_id_lock = PTHREAD_MUTEX_INITIALIZER; +static std::vector consumed_job_ids; -// if( res->job_id > 2 ) -// fwprintf( stderr, L"Create job %d\n", res->job_id ); - return res; +job_id_t acquire_job_id(void) +{ + scoped_lock lock(job_id_lock); + + /* Find the index of the first 0 slot */ + std::vector::iterator slot = std::find(consumed_job_ids.begin(), consumed_job_ids.end(), false); + if (slot != consumed_job_ids.end()) + { + /* We found a slot. Note that slot 0 corresponds to job ID 1. */ + *slot = true; + return slot - consumed_job_ids.begin() + 1; + } + else + { + /* We did not find a slot; create a new slot. The size of the vector is now the job ID (since it is one larger than the slot). */ + consumed_job_ids.push_back(true); + return (job_id_t)consumed_job_ids.size(); + } } - -job_t *job_get( int id ) +void release_job_id(job_id_t jid) { - job_iterator_t jobs; - job_t *job; - while ((job = jobs.next())) { - if( id <= 0 || job->job_id == id) - return job; - } - return NULL; + assert(jid > 0); + scoped_lock lock(job_id_lock); + size_t slot = (size_t)(jid - 1), count = consumed_job_ids.size(); + + /* Make sure this slot is within our vector and is currently set to consumed */ + assert(slot < count); + assert(consumed_job_ids.at(slot) == true); + + /* Clear it and then resize the vector to eliminate unused trailing job IDs */ + consumed_job_ids.at(slot) = false; + while (count--) { + if (consumed_job_ids.at(count)) + break; + } + consumed_job_ids.resize(count + 1); +} + +job_t *job_get( job_id_t id ) +{ + ASSERT_IS_MAIN_THREAD(); + return parser_t::principal_parser().job_get(id); } job_t *job_get_from_pid( int pid ) { - job_iterator_t jobs; - job_t *job; - while ((job = jobs.next())) { - if( job->pgid == pid ) - return job; - } - return 0; + ASSERT_IS_MAIN_THREAD(); + return parser_t::principal_parser().job_get_from_pid(pid); } @@ -308,7 +324,7 @@ void job_set_flag( job_t *j, int flag, int set ) j->flags = j->flags & (0xffffffff ^ flag); } -int job_get_flag( job_t *j, int flag ) +int job_get_flag( const job_t *j, int flag ) { return j->flags&flag?1:0; } @@ -352,7 +368,7 @@ int job_signal( job_t *j, int signal ) Store the status of the process pid that was returned by waitpid. Return 0 if all went well, nonzero otherwise. */ -static void mark_process_status( job_t *j, +static void mark_process_status( const job_t *j, process_t *p, int status ) { @@ -399,7 +415,7 @@ static void mark_process_status( job_t *j, static void handle_child_status( pid_t pid, int status ) { int found_proc = 0; - job_t *j=0; + const job_t *j=0; process_t *p=0; // char mess[MESS_SIZE]; found_proc = 0; diff --git a/proc.h b/proc.h index df05d96e4..41a263ead 100644 --- a/proc.h +++ b/proc.h @@ -279,11 +279,15 @@ class process_t A struct represeting a job. A job is basically a pipeline of one or more processes and a couple of flags. */ +typedef int job_id_t; +job_id_t acquire_job_id(void); +void release_job_id(job_id_t jobid); + class job_t { public: - job_t(int jobid) : + job_t(job_id_t jobid) : command(), first_process(NULL), pgid(0), @@ -303,6 +307,7 @@ class job_t delete data; data = tmp; } + release_job_id(job_id); } @@ -339,7 +344,7 @@ class job_t unique identifier of the job within this shell, and is used e.g. in process expansion. */ - const int job_id; + const job_id_t job_id; /** List of all IO redirections for this job. This linked list is allocated via new, and owned by the object, which should delete them. @@ -384,20 +389,17 @@ extern int is_login; extern int is_event; -/** List of all living jobs */ typedef std::list job_list_t; -job_list_t &job_list(void); + +bool job_list_is_empty(void); /** A class to aid iteration over jobs list */ class job_iterator_t { + job_list_t * const job_list; job_list_t::iterator current, end; public: - void reset(void) { - job_list_t &jobs = job_list(); - this->current = jobs.begin(); - this->end = jobs.end(); - } + void reset(void); job_t *next() { job_t *job = NULL; @@ -408,9 +410,8 @@ class job_iterator_t { return job; } - job_iterator_t() { - this->reset(); - } + job_iterator_t(job_list_t &jobs); + job_iterator_t(); }; /** @@ -452,7 +453,7 @@ void job_set_flag( job_t *j, int flag, int set ); /** Returns one if the specified flag is set in the specified job, 0 otherwise. */ -int job_get_flag( job_t *j, int flag ); +int job_get_flag( const job_t *j, int flag ); /** Sets the status of the last process to exit @@ -483,7 +484,7 @@ job_t *job_create(); Return the job with the specified job id. If id is 0 or less, return the last job used. */ -job_t *job_get(int id); +job_t *job_get(job_id_t id); /** diff --git a/reader.cpp b/reader.cpp index 8373c73a3..553b405fa 100644 --- a/reader.cpp +++ b/reader.cpp @@ -2481,7 +2481,7 @@ static void reader_super_highlight_me_plenty( int match_highlight_pos ) int exit_status() { if( get_is_interactive() ) - return job_list().empty() && data->end_loop; + return job_list_is_empty() && data->end_loop; else return end_loop; }