Switch builtin execution to the performer model

In preparation for concurrent execution, introduce a
`get_performer_for_builtin` function. This function itself returns a
function, which when called will run the builtin. The idea is that the
function may be called on a background thread (but not in this commit).
This commit is contained in:
ridiculousfish 2021-02-14 14:09:59 -08:00
parent fb92ad946b
commit 48868e5667
2 changed files with 101 additions and 77 deletions

View file

@ -447,71 +447,18 @@ static launch_result_t fork_child_for_process(const std::shared_ptr<job_t> &job,
return launch_result_t::ok;
}
/// Execute an internal builtin. Given a parser and a builtin process, execute the builtin with the
/// given streams. Infer stdin from the IO chain. An error return here indicates that the process
/// failed to launch, and the rest of the pipeline should be cancelled.
static void exec_internal_builtin_proc(parser_t &parser, process_t *p,
const io_chain_t &proc_io_chain, io_streams_t &streams) {
assert(p->type == process_type_t::builtin && "Process must be a builtin");
int local_builtin_stdin = STDIN_FILENO;
// Figure out what fd to use for the builtin's stdin.
if (const auto in = proc_io_chain.io_for_fd(STDIN_FILENO)) {
// Ignore fd redirections from an fd other than the
// standard ones. e.g. in source <&3 don't actually read from fd 3,
// which is internal to fish. We still respect this redirection in
// that we pass it on as a block IO to the code that source runs,
// and therefore this is not an error.
bool ignore_redirect = in->io_mode == io_mode_t::fd && in->source_fd >= 3;
if (!ignore_redirect) {
local_builtin_stdin = in->source_fd;
}
}
// Determine if we have a "direct" redirection for stdin.
bool stdin_is_directly_redirected = false;
if (!p->is_first_in_job) {
// We must have a pipe
stdin_is_directly_redirected = true;
} else {
// We are not a pipe. Check if there is a redirection local to the process
// that's not io_mode_t::close.
for (const auto &redir : p->redirection_specs()) {
if (redir.fd == STDIN_FILENO && !redir.is_close()) {
stdin_is_directly_redirected = true;
break;
}
}
}
// Pull out the IOs for stdout and stderr.
auto out_io = proc_io_chain.io_for_fd(STDOUT_FILENO);
auto err_io = proc_io_chain.io_for_fd(STDERR_FILENO);
// Set up our streams.
streams.stdin_fd = local_builtin_stdin;
streams.out_is_redirected = out_io != nullptr;
streams.err_is_redirected = err_io != nullptr;
streams.out_is_piped = (out_io != nullptr && out_io->io_mode == io_mode_t::pipe);
streams.err_is_piped = (err_io != nullptr && err_io->io_mode == io_mode_t::pipe);
streams.stdin_is_directly_redirected = stdin_is_directly_redirected;
streams.io_chain = &proc_io_chain;
// Note this call may block for a long time while the builtin runs.
p->status = builtin_run(parser, p->argv(), streams);
}
/// \return an newly allocated output stream for the given fd, which is typically stdout or stderr.
/// This inspects the io_chain and decides what sort of output stream to return.
/// If \p piped_output_needs_buffering is set, and if the output is going to a pipe, then the other
/// end then synchronously writing to the pipe risks deadlock, so we must buffer it.
static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(
static std::shared_ptr<output_stream_t> create_output_stream_for_builtin(
int fd, const io_chain_t &io_chain, bool piped_output_needs_buffering) {
using std::make_shared;
const shared_ptr<const io_data_t> io = io_chain.io_for_fd(fd);
if (io == nullptr) {
// Common case of no redirections.
// Just write to the fd directly.
return make_unique<fd_output_stream_t>(fd);
return make_shared<fd_output_stream_t>(fd);
}
switch (io->io_mode) {
case io_mode_t::bufferfill: {
@ -526,24 +473,24 @@ static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(
case io_mode_t::close:
// Like 'echo foo >&-'
return make_unique<null_output_stream_t>();
return make_shared<null_output_stream_t>();
case io_mode_t::file:
// Output is to a file which has been opened.
return make_unique<fd_output_stream_t>(io->source_fd);
return make_shared<fd_output_stream_t>(io->source_fd);
case io_mode_t::pipe:
// Output is to a pipe. We may need to buffer.
if (piped_output_needs_buffering) {
return make_unique<string_output_stream_t>();
return make_shared<string_output_stream_t>();
} else {
return make_unique<fd_output_stream_t>(io->source_fd);
return make_shared<fd_output_stream_t>(io->source_fd);
}
case io_mode_t::fd:
// This is a case like 'echo foo >&5'
// It's uncommon and unclear what should happen.
return make_unique<string_output_stream_t>();
return make_shared<string_output_stream_t>();
}
DIE("Unreachable");
}
@ -551,17 +498,18 @@ static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(
/// Handle output from a builtin, by printing the contents of builtin_io_streams to the redirections
/// given in io_chain.
static void handle_builtin_output(parser_t &parser, const std::shared_ptr<job_t> &j, process_t *p,
const io_chain_t &io_chain, const io_streams_t &streams) {
const io_chain_t &io_chain, const output_stream_t &out,
const output_stream_t &err) {
assert(p->type == process_type_t::builtin && "Process is not a builtin");
// Mark if we discarded output.
if (streams.out.discarded() || streams.err.discarded()) {
if (out.discarded() || err.discarded()) {
p->status = proc_status_t::from_exit_code(STATUS_READ_TOO_MUCH);
}
// Figure out any data remaining to write. We may have none, in which case we can short-circuit.
std::string outbuff = wcs2string(streams.out.contents());
std::string errbuff = wcs2string(streams.err.contents());
std::string outbuff = wcs2string(out.contents());
std::string errbuff = wcs2string(err.contents());
// Some historical behavior.
if (!outbuff.empty()) fflush(stdout);
@ -688,8 +636,7 @@ static void function_restore_environment(parser_t &parser, const block_t *block)
}
// The "performer" function of a block or function process.
// This accepts a place to execute as \p parser, and a parent job as \p parent, and then executes
// the result, returning a status.
// This accepts a place to execute as \p parser and then executes the result, returning a status.
// This is factored out in this funny way in preparation for concurrent execution.
using proc_performer_t = std::function<proc_status_t(parser_t &parser)>;
@ -777,6 +724,88 @@ static launch_result_t exec_block_or_func_process(parser_t &parser, const std::s
return launch_result_t::ok;
}
static proc_performer_t get_performer_for_builtin(
process_t *p, job_t *job, const io_chain_t &io_chain,
const std::shared_ptr<output_stream_t> output_stream,
const std::shared_ptr<output_stream_t> errput_stream) {
assert(p->type == process_type_t::builtin && "Process must be a builtin");
// Determine if we have a "direct" redirection for stdin.
bool stdin_is_directly_redirected = false;
if (!p->is_first_in_job) {
// We must have a pipe
stdin_is_directly_redirected = true;
} else {
// We are not a pipe. Check if there is a redirection local to the process
// that's not io_mode_t::close.
for (const auto &redir : p->redirection_specs()) {
if (redir.fd == STDIN_FILENO && !redir.is_close()) {
stdin_is_directly_redirected = true;
break;
}
}
}
// Pull out some fields which we want to copy. We don't want to store the process or job in the
// returned closure.
job_group_ref_t job_group = job->group;
const wcstring_list_t &argv = p->argv();
// Be careful to not capture p or j by value, as the intent is that this may be run on another
// thread.
return [=](parser_t &parser) {
auto out_io = io_chain.io_for_fd(STDOUT_FILENO);
auto err_io = io_chain.io_for_fd(STDERR_FILENO);
// Figure out what fd to use for the builtin's stdin.
int local_builtin_stdin = STDIN_FILENO;
if (const auto in = io_chain.io_for_fd(STDIN_FILENO)) {
// Ignore fd redirections from an fd other than the
// standard ones. e.g. in source <&3 don't actually read from fd 3,
// which is internal to fish. We still respect this redirection in
// that we pass it on as a block IO to the code that source runs,
// and therefore this is not an error.
bool ignore_redirect = in->io_mode == io_mode_t::fd && in->source_fd >= 3;
if (!ignore_redirect) {
local_builtin_stdin = in->source_fd;
}
}
// Populate our io_streams_t. This is a bag of information for the builtin.
io_streams_t streams{*output_stream, *errput_stream};
streams.job_group = job_group;
streams.stdin_fd = local_builtin_stdin;
streams.stdin_is_directly_redirected = stdin_is_directly_redirected;
streams.out_is_redirected = out_io != nullptr;
streams.err_is_redirected = err_io != nullptr;
streams.out_is_piped = (out_io && out_io->io_mode == io_mode_t::pipe);
streams.err_is_piped = (err_io && err_io->io_mode == io_mode_t::pipe);
streams.io_chain = &io_chain;
// Execute the builtin.
return builtin_run(parser, argv, streams);
};
}
/// Executes a builtin "process".
static launch_result_t exec_builtin_process(parser_t &parser, const std::shared_ptr<job_t> &j,
process_t *p, const io_chain_t &io_chain,
bool piped_output_needs_buffering) {
assert(p->type == process_type_t::builtin && "Process is not a builtin");
std::shared_ptr<output_stream_t> out =
create_output_stream_for_builtin(STDOUT_FILENO, io_chain, piped_output_needs_buffering);
std::shared_ptr<output_stream_t> err =
create_output_stream_for_builtin(STDERR_FILENO, io_chain, piped_output_needs_buffering);
if (proc_performer_t performer = get_performer_for_builtin(p, j.get(), io_chain, out, err)) {
p->status = performer(parser);
} else {
return launch_result_t::failed;
}
handle_builtin_output(parser, j, p, io_chain, *out, *err);
return launch_result_t::ok;
}
/// Executes a process \p \p in \p job, using the pipes \p pipes (which may have invalid fds if this
/// is the first or last process).
/// \p deferred_pipes represents the pipes from our deferred process; if set ensure they get closed
@ -894,15 +923,10 @@ static launch_result_t exec_process_in_job(parser_t &parser, process_t *p,
}
case process_type_t::builtin: {
std::unique_ptr<output_stream_t> output_stream = create_output_stream_for_builtin(
STDOUT_FILENO, process_net_io_chain, piped_output_needs_buffering);
std::unique_ptr<output_stream_t> errput_stream = create_output_stream_for_builtin(
STDERR_FILENO, process_net_io_chain, piped_output_needs_buffering);
io_streams_t builtin_io_streams{*output_stream, *errput_stream};
builtin_io_streams.job_group = j->group;
exec_internal_builtin_proc(parser, p, process_net_io_chain, builtin_io_streams);
handle_builtin_output(parser, j, p, process_net_io_chain, builtin_io_streams);
if (exec_builtin_process(parser, j, p, process_net_io_chain,
piped_output_needs_buffering) == launch_result_t::failed) {
return launch_result_t::failed;
}
break;
}

View file

@ -1124,7 +1124,7 @@ static bool detect_errors_in_decorated_statement(const wcstring &buff_src,
bool first_arg_is_help = false;
if (const auto *arg = get_first_arg(dst.args_or_redirs)) {
const wcstring &arg_src = arg->source(buff_src, storage);
first_arg_is_help = parse_util_argument_is_help(arg_src.c_str());
first_arg_is_help = parse_util_argument_is_help(arg_src);
}
// Get the statement we are part of.