io_buffer_t becomes io_bufferfill_t

This makes some significant architectual improvements to io_pipe_t and
io_buffer_t.

Prior to this fix, io_buffer_t subclassed io_pipe_t. io_buffer_t is now
replaced with a class io_bufferfill_t, which does not subclass pipe.

io_pipe_t no longer remembers both fds. Instead it has an autoclose_fd_t,
so that the file descriptor ownership is clear.
This commit is contained in:
ridiculousfish 2019-01-31 16:05:42 -08:00
parent 084ff64f4f
commit 178b72b2fd
8 changed files with 289 additions and 341 deletions

View file

@ -786,6 +786,9 @@ class autoclose_fd_t {
fd_ = fd; fd_ = fd;
} }
// \return if this has a valid fd.
bool valid() const { return fd_ >= 0; }
autoclose_fd_t(const autoclose_fd_t &) = delete; autoclose_fd_t(const autoclose_fd_t &) = delete;
void operator=(const autoclose_fd_t &) = delete; void operator=(const autoclose_fd_t &) = delete;
autoclose_fd_t(autoclose_fd_t &&rhs) : fd_(rhs.fd_) { rhs.fd_ = -1; } autoclose_fd_t(autoclose_fd_t &&rhs) : fd_(rhs.fd_) { rhs.fd_ = -1; }

View file

@ -81,25 +81,6 @@ void exec_close(int fd) {
} }
} }
int exec_pipe(int fd[2]) {
ASSERT_IS_MAIN_THREAD();
int res;
while ((res = pipe(fd))) {
if (errno != EINTR) {
return res; // caller will call wperror
}
}
debug(4, L"Created pipe using fds %d and %d", fd[0], fd[1]);
// Pipes ought to be cloexec. Pipes are dup2'd the corresponding fds; the resulting fds are not
// cloexec.
set_cloexec(fd[0]);
set_cloexec(fd[1]);
return res;
}
/// Returns true if the redirection is a file redirection to a file other than /dev/null. /// Returns true if the redirection is a file redirection to a file other than /dev/null.
static bool redirection_is_to_real_file(const io_data_t *io) { static bool redirection_is_to_real_file(const io_data_t *io) {
bool result = false; bool result = false;
@ -246,8 +227,8 @@ static bool io_transmogrify(const io_chain_t &in_chain, io_chain_t *out_chain,
switch (in->io_mode) { switch (in->io_mode) {
case io_mode_t::pipe: case io_mode_t::pipe:
case io_mode_t::bufferfill:
case io_mode_t::fd: case io_mode_t::fd:
case io_mode_t::buffer:
case io_mode_t::close: { case io_mode_t::close: {
// These redirections don't need transmogrification. They can be passed through. // These redirections don't need transmogrification. They can be passed through.
out = in; out = in;
@ -424,12 +405,12 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
const io_chain_t &proc_io_chain, io_streams_t &streams) { const io_chain_t &proc_io_chain, io_streams_t &streams) {
assert(p->type == INTERNAL_BUILTIN && "Process must be a builtin"); assert(p->type == INTERNAL_BUILTIN && "Process must be a builtin");
int local_builtin_stdin = STDIN_FILENO; int local_builtin_stdin = STDIN_FILENO;
bool close_stdin = false; autoclose_fd_t locally_opened_stdin{};
// If this is the first process, check the io redirections and see where we should // If this is the first process, check the io redirections and see where we should
// be reading from. // be reading from.
if (pipe_read) { if (pipe_read) {
local_builtin_stdin = pipe_read->pipe_fd[0]; local_builtin_stdin = pipe_read->pipe_fd();
} else if (const auto in = proc_io_chain.get_io_for_fd(STDIN_FILENO)) { } else if (const auto in = proc_io_chain.get_io_for_fd(STDIN_FILENO)) {
switch (in->io_mode) { switch (in->io_mode) {
case io_mode_t::fd: { case io_mode_t::fd: {
@ -448,20 +429,20 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
} }
case io_mode_t::pipe: { case io_mode_t::pipe: {
const io_pipe_t *in_pipe = static_cast<const io_pipe_t *>(in.get()); const io_pipe_t *in_pipe = static_cast<const io_pipe_t *>(in.get());
local_builtin_stdin = in_pipe->pipe_fd[0]; if (in_pipe->fd == STDIN_FILENO) {
local_builtin_stdin = in_pipe->pipe_fd();
}
break; break;
} }
case io_mode_t::file: { case io_mode_t::file: {
// Do not set CLO_EXEC because child needs access.
const io_file_t *in_file = static_cast<const io_file_t *>(in.get()); const io_file_t *in_file = static_cast<const io_file_t *>(in.get());
local_builtin_stdin = open(in_file->filename_cstr, in_file->flags, OPEN_MASK); locally_opened_stdin =
if (local_builtin_stdin == -1) { autoclose_fd_t{open(in_file->filename_cstr, in_file->flags, OPEN_MASK)};
if (!locally_opened_stdin.valid()) {
debug(1, FILE_ERROR, in_file->filename_cstr); debug(1, FILE_ERROR, in_file->filename_cstr);
wperror(L"open"); wperror(L"open");
} else {
close_stdin = true;
} }
local_builtin_stdin = locally_opened_stdin.fd();
break; break;
} }
case io_mode_t::close: { case io_mode_t::close: {
@ -517,10 +498,6 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
// execution so as not to confuse some job-handling builtins. // execution so as not to confuse some job-handling builtins.
j->set_flag(job_flag_t::FOREGROUND, fg); j->set_flag(job_flag_t::FOREGROUND, fg);
// If stdin has been redirected, close the redirection stream.
if (close_stdin) {
exec_close(local_builtin_stdin);
}
return true; // "success" return true; // "success"
} }
@ -548,7 +525,11 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
if (!must_fork && p->is_last_in_job) { if (!must_fork && p->is_last_in_job) {
// We are handling reads directly in the main loop. Note that we may still end // We are handling reads directly in the main loop. Note that we may still end
// up forking. // up forking.
const bool stdout_is_to_buffer = stdout_io && stdout_io->io_mode == io_mode_t::buffer; const bool stdout_is_bufferfill =
(stdout_io && stdout_io->io_mode == io_mode_t::bufferfill);
const std::shared_ptr<io_buffer_t> stdout_buffer =
stdout_is_bufferfill ? static_cast<io_bufferfill_t *>(stdout_io.get())->buffer()
: nullptr;
const bool no_stdout_output = stdout_stream.empty(); const bool no_stdout_output = stdout_stream.empty();
const bool no_stderr_output = stderr_stream.empty(); const bool no_stderr_output = stderr_stream.empty();
const bool stdout_discarded = stdout_stream.buffer().discarded(); const bool stdout_discarded = stdout_stream.buffer().discarded();
@ -558,7 +539,7 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
// need to fork or even output anything. // need to fork or even output anything.
debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0()); debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0());
fork_was_skipped = true; fork_was_skipped = true;
} else if (no_stderr_output && stdout_is_to_buffer) { } else if (no_stderr_output && stdout_buffer) {
// The builtin produced no stderr, and its stdout is going to an // The builtin produced no stderr, and its stdout is going to an
// internal buffer. There is no need to fork. This helps out the // internal buffer. There is no need to fork. This helps out the
// performance quite a bit in complex completion code. // performance quite a bit in complex completion code.
@ -570,8 +551,7 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
// also produce stderr. // also produce stderr.
debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0()); debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0());
io_buffer_t *io_buffer = static_cast<io_buffer_t *>(stdout_io.get()); stdout_buffer->append_from_stream(stdout_stream);
io_buffer->append_from_stream(stdout_stream);
fork_was_skipped = true; fork_was_skipped = true;
} else if (stdout_io.get() == NULL && stderr_io.get() == NULL) { } else if (stdout_io.get() == NULL && stderr_io.get() == NULL) {
// We are writing to normal stdout and stderr. Just do it - no need to fork. // We are writing to normal stdout and stderr. Just do it - no need to fork.
@ -749,20 +729,16 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
"Unexpected process type"); "Unexpected process type");
// Create an output buffer if we're piping to another process. // Create an output buffer if we're piping to another process.
shared_ptr<io_buffer_t> block_output_io_buffer{}; shared_ptr<io_bufferfill_t> block_output_bufferfill{};
if (!p->is_last_in_job) { if (!p->is_last_in_job) {
// Be careful to handle failure, e.g. too many open fds. // Be careful to handle failure, e.g. too many open fds.
block_output_io_buffer = io_buffer_t::create(STDOUT_FILENO, user_ios); block_output_bufferfill = io_bufferfill_t::create(user_ios);
if (!block_output_io_buffer) { if (!block_output_bufferfill) {
job_mark_process_as_failed(j, p); job_mark_process_as_failed(j, p);
return false; return false;
} else {
// This looks sketchy, because we're adding this io buffer locally - they
// aren't in the process or job redirection list. Therefore select_try won't
// be able to read them. However we call block_output_io_buffer->read()
// below, which reads until EOF. So there's no need to select on this.
io_chain.push_back(block_output_io_buffer);
} }
// Teach the job about its bufferfill, and add it to our io chain.
io_chain.push_back(block_output_bufferfill);
} }
if (p->type == INTERNAL_FUNCTION) { if (p->type == INTERNAL_FUNCTION) {
@ -792,10 +768,8 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
int status = proc_get_last_status(); int status = proc_get_last_status();
// Handle output from a block or function. This usually means do nothing, but in the // If we have a block output buffer, populate it now.
// case of pipes, we have to buffer such io, since otherwise the internal pipe if (!block_output_bufferfill) {
// buffer might overflow.
if (!block_output_io_buffer.get()) {
// No buffer, so we exit directly. This means we have to manually set the exit // No buffer, so we exit directly. This means we have to manually set the exit
// status. // status.
if (p->is_last_in_job) { if (p->is_last_in_job) {
@ -804,11 +778,16 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
p->completed = 1; p->completed = 1;
return true; return true;
} }
assert(block_output_bufferfill && "Must have a block output bufferfiller");
// Here we must have a non-NULL block_output_io_buffer. // Remove our write pipe and forget it. This may close the pipe, unless another thread has
assert(block_output_io_buffer.get() != NULL); // claimed it (background write) or another process has inherited it.
io_chain.remove(block_output_io_buffer); auto block_output_buffer = block_output_bufferfill->buffer();
block_output_io_buffer->read(); io_chain.remove(block_output_bufferfill);
block_output_bufferfill.reset();
// Make the buffer populate itself from whatever was written to the write pipe.
block_output_buffer->read_to_wouldblock();
// Resolve our IO chain to a sequence of dup2s. // Resolve our IO chain to a sequence of dup2s.
auto dup2s = dup2_list_t::resolve_chain(io_chain); auto dup2s = dup2_list_t::resolve_chain(io_chain);
@ -816,7 +795,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
return false; return false;
} }
const std::string buffer_contents = block_output_io_buffer->buffer().newline_serialized(); const std::string buffer_contents = block_output_buffer->buffer().newline_serialized();
const char *buffer = buffer_contents.data(); const char *buffer = buffer_contents.data();
size_t count = buffer_contents.size(); size_t count = buffer_contents.size();
if (count > 0) { if (count > 0) {
@ -824,7 +803,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
const char *fork_reason = const char *fork_reason =
p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io"; p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io";
if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] { if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] {
exec_write_and_exit(block_output_io_buffer->fd, buffer, count, status); exec_write_and_exit(STDOUT_FILENO, buffer, count, status);
})) { })) {
return false; return false;
} }
@ -844,19 +823,28 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
autoclose_fd_t pipe_current_read, autoclose_fd_t pipe_current_read,
autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios, autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios,
size_t stdout_read_limit) { size_t stdout_read_limit) {
// The IO chain for this process. It starts with the block IO, then pipes, and then gets any // The pipe this command will write to (if any).
// from the process. shared_ptr<io_pipe_t> pipe_write;
io_chain_t process_net_io_chain = j->block_io_chain(); // The pipe this command will read from (if any).
shared_ptr<io_pipe_t> pipe_read;
// See if we need a pipe. // See if we need a pipe for the next command.
const bool pipes_to_next_command = !p->is_last_in_job; const bool pipes_to_next_command = !p->is_last_in_job;
if (pipes_to_next_command) {
// Construct our pipes.
auto local_pipes = make_autoclose_pipes(all_ios);
if (!local_pipes) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
job_mark_process_as_failed(j, p);
return false;
}
// The write end of any pipe we create. pipe_write = std::make_shared<io_pipe_t>(p->pipe_write_fd, false /* not input */,
autoclose_fd_t pipe_current_write{}; std::move(local_pipes->write));
*out_pipe_next_read = std::move(local_pipes->read);
}
// The pipes the current process write to and read from. Unfortunately these can't be just
// allocated on the stack, since j->io wants shared_ptr.
//
// The write pipe (destined for stdout) needs to occur before redirections. For example, // The write pipe (destined for stdout) needs to occur before redirections. For example,
// with a redirection like this: // with a redirection like this:
// //
@ -884,12 +872,10 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
// //
// which depends on the redirection being evaluated before the pipe. So the write end of the // which depends on the redirection being evaluated before the pipe. So the write end of the
// pipe comes first, the read pipe of the pipe comes last. See issue #966. // pipe comes first, the read pipe of the pipe comes last. See issue #966.
shared_ptr<io_pipe_t> pipe_write;
shared_ptr<io_pipe_t> pipe_read;
// Write pipe goes first. // The IO chain for this process.
if (pipes_to_next_command) { io_chain_t process_net_io_chain = j->block_io_chain();
pipe_write.reset(new io_pipe_t(p->pipe_write_fd, false)); if (pipe_write) {
process_net_io_chain.push_back(pipe_write); process_net_io_chain.push_back(pipe_write);
} }
@ -897,10 +883,9 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
process_net_io_chain.append(p->io_chain()); process_net_io_chain.append(p->io_chain());
// Read pipe goes last. // Read pipe goes last.
if (!p->is_first_in_job) { if (pipe_current_read.valid()) {
pipe_read.reset(new io_pipe_t(STDIN_FILENO, true)); pipe_read = std::make_shared<io_pipe_t>(STDIN_FILENO, true /* input */,
// Record the current read in pipe_read. std::move(pipe_current_read));
pipe_read->pipe_fd[0] = pipe_current_read.fd();
process_net_io_chain.push_back(pipe_read); process_net_io_chain.push_back(pipe_read);
} }
@ -918,36 +903,6 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
parser.vars().export_arr(); parser.vars().export_arr();
} }
// Set up fds that will be used in the pipe.
if (pipes_to_next_command) {
// debug( 1, L"%ls|%ls" , p->argv[0], p->next->argv[0]);
int local_pipe[2] = {-1, -1};
if (exec_pipe(local_pipe) == -1) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
job_mark_process_as_failed(j, p);
return false;
}
// Ensure our pipe fds not conflict with any fd redirections. E.g. if the process is
// like 'cat <&5' then fd 5 must not be used by the pipe.
if (!pipe_avoid_conflicts_with_io_chain(local_pipe, all_ios)) {
// We failed. The pipes were closed for us.
wperror(L"dup");
job_mark_process_as_failed(j, p);
return false;
}
// This tells the redirection about the fds, but the redirection does not close them.
assert(local_pipe[0] >= 0);
assert(local_pipe[1] >= 0);
memcpy(pipe_write->pipe_fd, local_pipe, sizeof(int) * 2);
// Record our pipes.
pipe_current_write.reset(local_pipe[1]);
out_pipe_next_read->reset(local_pipe[0]);
}
// Execute the process. // Execute the process.
switch (p->type) { switch (p->type) {
case INTERNAL_FUNCTION: case INTERNAL_FUNCTION:
@ -1008,18 +963,13 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
} }
} }
// Verify that all io_mode_t::buffers are output. We used to support a (single, hacked-in)
// magical input io_mode_t::buffer used by fish_pager, but now the claim is that there are no
// more clients and it is removed. This assertion double-checks that.
size_t stdout_read_limit = 0; size_t stdout_read_limit = 0;
const io_chain_t all_ios = j->all_io_redirections(); const io_chain_t all_ios = j->all_io_redirections();
for (size_t idx = 0; idx < all_ios.size(); idx++) { for (auto &io : all_ios) {
const shared_ptr<io_data_t> &io = all_ios.at(idx); if ((io->io_mode == io_mode_t::bufferfill)) {
// The read limit is dictated by the last bufferfill.
if ((io->io_mode == io_mode_t::buffer)) { const auto *bf = static_cast<io_bufferfill_t *>(io.get());
io_buffer_t *io_buffer = static_cast<io_buffer_t *>(io.get()); stdout_read_limit = bf->buffer()->buffer().limit();
assert(!io_buffer->is_input);
stdout_read_limit = io_buffer->buffer().limit();
} }
} }
@ -1028,21 +978,6 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
DIE("this should be unreachable"); DIE("this should be unreachable");
} }
// We may have block IOs that conflict with fd redirections. For example, we may have a command
// with a redireciton like <&3; we may also have chosen 3 as the fd for our pipe. Ensure we have
// no conflicts.
for (const auto io : all_ios) {
if (io->io_mode == io_mode_t::buffer) {
auto *io_buffer = static_cast<io_buffer_t *>(io.get());
if (!io_buffer->avoid_conflicts_with_io_chain(all_ios)) {
// We could not avoid conflicts, probably due to fd exhaustion. Mark an error.
exec_error = true;
job_mark_process_as_failed(j, j->processes.front().get());
break;
}
}
}
// This loop loops over every process_t in the job, starting it as appropriate. This turns out // This loop loops over every process_t in the job, starting it as appropriate. This turns out
// to be rather complex, since a process_t can be one of many rather different things. // to be rather complex, since a process_t can be one of many rather different things.
// //
@ -1098,29 +1033,30 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin
// IO buffer creation may fail (e.g. if we have too many open files to make a pipe), so this may // IO buffer creation may fail (e.g. if we have too many open files to make a pipe), so this may
// be null. // be null.
const shared_ptr<io_buffer_t> io_buffer( size_t read_limit = is_subcmd ? read_byte_limit : 0;
io_buffer_t::create(STDOUT_FILENO, io_chain_t(), is_subcmd ? read_byte_limit : 0)); std::shared_ptr<io_buffer_t> buffer;
if (io_buffer.get() != NULL) { if (auto bufferfill = io_bufferfill_t::create(io_chain_t{}, read_limit)) {
parser_t &parser = parser_t::principal_parser(); parser_t &parser = parser_t::principal_parser();
if (parser.eval(cmd, io_chain_t{io_buffer}, SUBST) == 0) { if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) {
subcommand_status = proc_get_last_status(); subcommand_status = proc_get_last_status();
} }
buffer = bufferfill->buffer();
io_buffer->read(); bufferfill.reset();
buffer->read_to_wouldblock();
} }
if (io_buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH;
// If the caller asked us to preserve the exit status, restore the old status. Otherwise set the // If the caller asked us to preserve the exit status, restore the old status. Otherwise set the
// status of the subcommand. // status of the subcommand.
proc_set_last_status(apply_exit_status ? subcommand_status : prev_status); proc_set_last_status(apply_exit_status ? subcommand_status : prev_status);
is_subshell = prev_subshell; is_subshell = prev_subshell;
if (lst == NULL || io_buffer.get() == NULL) { if (lst == NULL || !buffer) {
return subcommand_status; return subcommand_status;
} }
// Walk over all the elements. // Walk over all the elements.
for (const auto &elem : io_buffer->buffer().elements()) { for (const auto &elem : buffer->buffer().elements()) {
if (elem.is_explicitly_separated()) { if (elem.is_explicitly_separated()) {
// Just append this one. // Just append this one.
lst->push_back(str2wcstring(elem.contents)); lst->push_back(str2wcstring(elem.contents));

View file

@ -31,10 +31,6 @@ int exec_subshell(const wcstring &cmd, parser_t &parser, bool preserve_exit_stat
/// Loops over close until the syscall was run without being interrupted. /// Loops over close until the syscall was run without being interrupted.
void exec_close(int fd); void exec_close(int fd);
/// Call pipe(), and add resulting fds to open_fds, the list of opened file descriptors for pipes.
/// The pipes are marked CLO_EXEC.
int exec_pipe(int fd[2]);
/// Gets the interpreter for a given command. /// Gets the interpreter for a given command.
char *get_interpreter(const char *command, char *interpreter, size_t buff_size); char *get_interpreter(const char *command, char *interpreter, size_t buff_size);

View file

@ -920,8 +920,7 @@ static void test_parser() {
} }
static void test_1_cancellation(const wchar_t *src) { static void test_1_cancellation(const wchar_t *src) {
shared_ptr<io_buffer_t> out_buff(io_buffer_t::create(STDOUT_FILENO, io_chain_t())); auto filler = io_bufferfill_t::create(io_chain_t{});
const io_chain_t io_chain{out_buff};
pthread_t thread = pthread_self(); pthread_t thread = pthread_self();
double delay = 0.25 /* seconds */; double delay = 0.25 /* seconds */;
iothread_perform([=]() { iothread_perform([=]() {
@ -929,11 +928,13 @@ static void test_1_cancellation(const wchar_t *src) {
usleep(delay * 1E6); usleep(delay * 1E6);
pthread_kill(thread, SIGINT); pthread_kill(thread, SIGINT);
}); });
parser_t::principal_parser().eval(src, io_chain, TOP); parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP);
out_buff->read(); auto buffer = filler->buffer();
if (out_buff->buffer().size() != 0) { filler.reset();
buffer->read_to_wouldblock();
if (buffer->buffer().size() != 0) {
err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n", err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n",
out_buff->buffer().size()); buffer->buffer().size());
} }
iothread_drain_all(); iothread_drain_all();
} }

View file

@ -22,14 +22,10 @@ void io_fd_t::print() const { fwprintf(stderr, L"FD map %d -> %d\n", old_fd, fd)
void io_file_t::print() const { fwprintf(stderr, L"file (%s)\n", filename_cstr); } void io_file_t::print() const { fwprintf(stderr, L"file (%s)\n", filename_cstr); }
void io_pipe_t::print() const { void io_pipe_t::print() const {
fwprintf(stderr, L"pipe {%d, %d} (input: %s)\n", pipe_fd[0], pipe_fd[1], fwprintf(stderr, L"pipe {%d} (input: %s)\n", pipe_fd(), is_input_ ? "yes" : "no");
is_input ? "yes" : "no");
} }
void io_buffer_t::print() const { void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); }
fwprintf(stderr, L"buffer (input: %s, size %lu)\n",
is_input ? "yes" : "no", (unsigned long)buffer_.size());
}
void io_buffer_t::append_from_stream(const output_stream_t &stream) { void io_buffer_t::append_from_stream(const output_stream_t &stream) {
if (buffer_.discarded()) return; if (buffer_.discarded()) return;
@ -40,75 +36,84 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) {
buffer_.append_wide_buffer(stream.buffer()); buffer_.append_wide_buffer(stream.buffer());
} }
void io_buffer_t::read() { long io_buffer_t::read_some() {
exec_close(pipe_fd[1]); int fd = read_.fd();
assert(fd >= 0 && "Should have a valid fd");
if (io_mode == io_mode_t::buffer) { debug(4, L"io_buffer_t::read: blocking read on fd %d", fd);
debug(4, L"io_buffer_t::read: blocking read on fd %d", pipe_fd[0]); long len;
while (1) {
char b[4096]; char b[4096];
long len = read_blocked(pipe_fd[0], b, 4096); do {
if (len == 0) { len = read(fd, b, sizeof b);
break; } while (len < 0 && errno == EINTR);
} else if (len < 0) { if (len > 0) {
// exec_read_io_buffer is only called on jobs that have exited, and will therefore
// never block. But a broken pipe seems to cause some flags to reset, causing the
// EOF flag to not be set. Therefore, EAGAIN is ignored and we exit anyway.
if (errno != EAGAIN) {
const wchar_t *fmt =
_(L"An error occured while reading output from code block on fd %d");
debug(1, fmt, pipe_fd[0]);
wperror(L"io_buffer_t::read");
}
break;
} else {
buffer_.append(&b[0], &b[len]); buffer_.append(&b[0], &b[len]);
} }
} return len;
}
void io_buffer_t::read_to_wouldblock() {
long len;
do {
len = read_some();
} while (len > 0);
if (len < 0 && errno != EAGAIN) {
debug(1, _(L"An error occured while reading output from code block on fd %d"), read_.fd());
wperror(L"io_buffer_t::read");
} }
} }
bool io_buffer_t::avoid_conflicts_with_io_chain(const io_chain_t &ios) { bool io_buffer_t::try_read(unsigned long timeout_usec) {
bool result = pipe_avoid_conflicts_with_io_chain(this->pipe_fd, ios); struct timeval timeout;
if (!result) { timeout.tv_sec = 0;
wperror(L"dup"); timeout.tv_usec = timeout_usec;
int fd = read_.fd();
assert(fd >= 0 && "Should have a valid fd");
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
int ret = select(fd + 1, &fds, nullptr, nullptr, &timeout);
if (ret < 0) {
// Treat EINTR as timeout.
if (errno != EINTR) {
debug(1, _(L"An error occured inside select on fd %d"), fd);
wperror(L"io_buffer_t::try_read");
} }
return result; return false;
}
if (ret > 0) {
read_some();
}
return ret > 0;
} }
shared_ptr<io_buffer_t> io_buffer_t::create(int fd, const io_chain_t &conflicts, shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts,
size_t buffer_limit) { size_t buffer_limit) {
bool success = true; // Construct our pipes.
assert(fd >= 0); auto pipes = make_autoclose_pipes(conflicts);
shared_ptr<io_buffer_t> buffer_redirect(new io_buffer_t(fd, buffer_limit)); if (!pipes) {
return nullptr;
}
if (exec_pipe(buffer_redirect->pipe_fd) == -1) { // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is
debug(1, PIPE_ERROR); // because we retain the write end of the pipe in this process (even after handing it off to a
wperror(L"pipe"); // child process); therefore a read on the pipe may block forever. What we should do is arrange
success = false; // for the write end of the pipe to be closed at the right time; then the read could just block.
} else if (!buffer_redirect->avoid_conflicts_with_io_chain(conflicts)) { if (make_fd_nonblocking(pipes->read.fd())) {
// The above call closes the fds on error.
success = false;
} else if (make_fd_nonblocking(buffer_redirect->pipe_fd[0]) != 0) {
debug(1, PIPE_ERROR); debug(1, PIPE_ERROR);
wperror(L"fcntl"); wperror(L"fcntl");
success = false; return nullptr;
} }
if (!success) { // Our buffer gets the read end of the pipe; out_pipe gets the write end.
buffer_redirect.reset(); auto buffer = std::make_shared<io_buffer_t>(std::move(pipes->read), buffer_limit);
} return std::make_shared<io_bufferfill_t>(std::move(pipes->write), buffer);
return buffer_redirect;
} }
io_buffer_t::~io_buffer_t() { io_pipe_t::~io_pipe_t() = default;
if (pipe_fd[0] >= 0) {
exec_close(pipe_fd[0]); io_bufferfill_t::~io_bufferfill_t() = default;
}
// Dont free fd for writing. This should already be free'd before calling exec_read_io_buffer on io_buffer_t::~io_buffer_t() = default;
// the buffer.
}
void io_chain_t::remove(const shared_ptr<const io_data_t> &element) { void io_chain_t::remove(const shared_ptr<const io_data_t> &element) {
// See if you can guess why std::find doesn't work here. // See if you can guess why std::find doesn't work here.
@ -197,7 +202,7 @@ int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec) {
return new_fd; return new_fd;
} }
bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) { static bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) {
bool success = true; bool success = true;
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
fds[i] = move_fd_to_unused(fds[i], ios); fds[i] = move_fd_to_unused(fds[i], ios);
@ -221,6 +226,27 @@ bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) {
return success; return success;
} }
maybe_t<autoclose_pipes_t> make_autoclose_pipes(const io_chain_t &ios) {
int pipes[2] = {-1, -1};
if (pipe(pipes) < 0) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
return none();
}
set_cloexec(pipes[0]);
set_cloexec(pipes[1]);
if (!pipe_avoid_conflicts_with_io_chain(pipes, ios)) {
// The pipes are closed on failure here.
return none();
}
autoclose_pipes_t result;
result.read = autoclose_fd_t(pipes[0]);
result.write = autoclose_fd_t(pipes[1]);
return {std::move(result)};
}
/// Return the last IO for the given fd. /// Return the last IO for the given fd.
shared_ptr<const io_data_t> io_chain_t::get_io_for_fd(int fd) const { shared_ptr<const io_data_t> io_chain_t::get_io_for_fd(int fd) const {
size_t idx = this->size(); size_t idx = this->size();

115
src/io.h
View file

@ -150,7 +150,7 @@ public:
}; };
/// Describes what type of IO operation an io_data_t represents. /// Describes what type of IO operation an io_data_t represents.
enum class io_mode_t { file, pipe, fd, buffer, close }; enum class io_mode_t { file, pipe, fd, close, bufferfill };
/// Represents an FD redirection. /// Represents an FD redirection.
class io_data_t { class io_data_t {
@ -210,39 +210,83 @@ class io_file_t : public io_data_t {
~io_file_t() override { free((void *)filename_cstr); } ~io_file_t() override { free((void *)filename_cstr); }
}; };
/// Represents (one end) of a pipe.
class io_pipe_t : public io_data_t { class io_pipe_t : public io_data_t {
protected: // The pipe's fd. Conceptually this is dup2'd to io_data_t::fd.
io_pipe_t(io_mode_t m, int f, bool i) : io_data_t(m, f), is_input(i) { autoclose_fd_t pipe_fd_;
pipe_fd[0] = pipe_fd[1] = -1;
} /// Whether this is an input pipe. This is used only for informational purposes.
const bool is_input_;
public: public:
int pipe_fd[2];
const bool is_input;
void print() const override; void print() const override;
io_pipe_t(int f, bool i) : io_data_t(io_mode_t::pipe, f), is_input(i) { io_pipe_t(int fd, bool is_input, autoclose_fd_t pipe_fd)
pipe_fd[0] = pipe_fd[1] = -1; : io_data_t(io_mode_t::pipe, fd), pipe_fd_(std::move(pipe_fd)), is_input_(is_input) {}
}
~io_pipe_t();
int pipe_fd() const { return pipe_fd_.fd(); }
}; };
class io_buffer_t;
class io_chain_t; class io_chain_t;
/// Represents filling an io_buffer_t. Very similar to io_pipe_t.
/// Bufferfills always target stdout.
class io_bufferfill_t : public io_data_t {
/// Write end. The other end is connected to an io_buffer_t.
const autoclose_fd_t write_fd_;
/// The receiving buffer.
const std::shared_ptr<io_buffer_t> buffer_;
public:
void print() const override;
io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr<io_buffer_t> buffer)
: io_data_t(io_mode_t::bufferfill, STDOUT_FILENO),
write_fd_(std::move(write_fd)),
buffer_(std::move(buffer)) {}
~io_bufferfill_t();
std::shared_ptr<io_buffer_t> buffer() const { return buffer_; }
int write_fd() const { return write_fd_.fd(); }
/// Create an io_bufferfill_t which, when written from, populates a buffer (also created).
/// \returns nullptr on failure, e.g. too many open fds.
///
/// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does
/// not conflict with an fd redirection in this list.
static shared_ptr<io_bufferfill_t> create(const io_chain_t &conflicts, size_t buffer_limit = 0);
};
class output_stream_t; class output_stream_t;
class io_buffer_t : public io_pipe_t {
/// An io_buffer_t is a buffer which can populate itself by reading from an fd.
/// It is not an io_data_t.
class io_buffer_t {
private: private:
/// fd from which to read.
autoclose_fd_t read_;
/// Buffer storing what we have read.
separated_buffer_t<std::string> buffer_; separated_buffer_t<std::string> buffer_;
explicit io_buffer_t(int f, size_t limit) /// Read some. Append it to our buffer.
: io_pipe_t(io_mode_t::buffer, f, false /* not input */), buffer_(limit) { /// \return positive if we read, 0 on EOF, -1 on error.
long read_some();
public:
explicit io_buffer_t(autoclose_fd_t read, size_t limit)
: read_(std::move(read)), buffer_(limit) {
// Explicitly reset the discard flag because we share this buffer. // Explicitly reset the discard flag because we share this buffer.
buffer_.reset_discard(); buffer_.reset_discard();
} }
public: ~io_buffer_t();
void print() const override;
~io_buffer_t() override;
/// Access the underlying buffer. /// Access the underlying buffer.
const separated_buffer_t<std::string> &buffer() const { return buffer_; } const separated_buffer_t<std::string> &buffer() const { return buffer_; }
@ -250,24 +294,16 @@ class io_buffer_t : public io_pipe_t {
/// Function to append to the buffer. /// Function to append to the buffer.
void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); } void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); }
/// Ensures that the pipes do not conflict with any fd redirections in the chain. /// Read from input pipe until EOF or EAGAIN (i.e. would block).
bool avoid_conflicts_with_io_chain(const io_chain_t &ios); void read_to_wouldblock();
/// Close output pipe, and read from input pipe until eof. /// Read a bit, if our fd is readable, with the given timeout.
void read(); /// \return true if we read some, false on timeout.
bool try_read(unsigned long timeout_usec);
/// Appends data from a given output_stream_t. /// Appends data from a given output_stream_t.
/// Marks the receiver as discarded if the stream was discarded. /// Marks the receiver as discarded if the stream was discarded.
void append_from_stream(const output_stream_t &stream); void append_from_stream(const output_stream_t &stream);
/// Create a io_mode_t::buffer type io redirection, complete with a pipe and a vector<char> for
/// output. The default file descriptor used is STDOUT_FILENO for buffering.
///
/// \param fd the fd that will be mapped in the child process, typically STDOUT_FILENO
/// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does
/// not conflict with an fd redirection in this list.
static shared_ptr<io_buffer_t> create(int fd, const io_chain_t &conflicts,
size_t buffer_limit = 0);
}; };
class io_chain_t : public std::vector<shared_ptr<io_data_t>> { class io_chain_t : public std::vector<shared_ptr<io_data_t>> {
@ -289,11 +325,18 @@ class io_chain_t : public std::vector<shared_ptr<io_data_t>> {
shared_ptr<const io_data_t> io_chain_get(const io_chain_t &src, int fd); shared_ptr<const io_data_t> io_chain_get(const io_chain_t &src, int fd);
shared_ptr<io_data_t> io_chain_get(io_chain_t &src, int fd); shared_ptr<io_data_t> io_chain_get(io_chain_t &src, int fd);
/// Given a pair of fds, if an fd is used by the given io chain, duplicate that fd repeatedly until /// Helper type returned from making autoclose pipes.
/// we find one that does not conflict, or we run out of fds. Returns the new fds by reference, struct autoclose_pipes_t {
/// closing the old ones. If we get an error, returns false (in which case both fds are closed and /// Read end of the pipe.
/// set to -1). autoclose_fd_t read;
bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios);
/// Write end of the pipe.
autoclose_fd_t write;
};
/// Call pipe(), populating autoclose fds, avoiding conflicts.
/// The pipes are marked CLO_EXEC.
/// \return pipes on success, none() on error.
maybe_t<autoclose_pipes_t> make_autoclose_pipes(const io_chain_t &ios);
/// If the given fd is used by the io chain, duplicates it repeatedly until an fd not used in the io /// If the given fd is used by the io chain, duplicates it repeatedly until an fd not used in the io
/// chain is found, or we run out. If we return a new fd or an error, closes the old one. /// chain is found, or we run out. If we return a new fd or an error, closes the old one.

View file

@ -837,86 +837,24 @@ void proc_update_jiffies() {
#endif #endif
/// The return value of select_try(), indicating IO readiness or an error /// The return value of select_try(), indicating IO readiness or an error
enum class select_try_t { enum class block_receive_try_t {
/// One or more fds have data ready for read /// There is no buffer to select on.
DATA_READY, NO_BUFFER,
/// The timeout elapsed without any data becoming available for read /// We have a block buffer, and we read some.
DATA_READ,
/// We have a block buffer, but we were unable to read any.
TIMEOUT, TIMEOUT,
/// There were no FDs in the io chain for which to select on.
IOCHAIN_EMPTY,
}; };
/// Check if there are buffers associated with the job, and select on them for a while if available. /// \return the last IO buffer in job j, or nullptr if none.
/// std::shared_ptr<io_buffer_t> last_buffer(job_t *j) {
/// \param j the job to test std::shared_ptr<io_buffer_t> buff{};
/// \return the status of the select operation for (const auto &io : j->all_io_redirections()) {
static select_try_t select_try(job_t *j) { if (io->io_mode == io_mode_t::bufferfill) {
fd_set fds; buff = static_cast<io_bufferfill_t *>(io.get())->buffer();
int maxfd = -1;
FD_ZERO(&fds);
const io_chain_t chain = j->all_io_redirections();
for (const auto &io : chain) {
if (io->io_mode == io_mode_t::buffer) {
auto io_pipe = static_cast<const io_pipe_t *>(io.get());
int fd = io_pipe->pipe_fd[0];
FD_SET(fd, &fds);
maxfd = std::max(maxfd, fd);
debug(4, L"select_try on fd %d", fd);
}
}
if (maxfd >= 0) {
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10000;
int retval = select(maxfd + 1, &fds, 0, 0, &timeout);
if (retval == 0) {
debug(4, L"select_try hit timeout");
return select_try_t::TIMEOUT;
}
return select_try_t::DATA_READY;
}
return select_try_t::IOCHAIN_EMPTY;
}
/// Read from descriptors until they are empty.
///
/// \param j the job to test
static void read_try(job_t *j) {
io_buffer_t *buff = NULL;
// Find the last buffer, which is the one we want to read from.
const io_chain_t chain = j->all_io_redirections();
for (size_t idx = 0; idx < chain.size(); idx++) {
io_data_t *d = chain.at(idx).get();
if (d->io_mode == io_mode_t::buffer) {
buff = static_cast<io_buffer_t *>(d);
}
}
if (buff) {
debug(4, L"proc::read_try('%ls')", j->command_wcstr());
while (1) {
char b[BUFFER_SIZE];
long len = read_blocked(buff->pipe_fd[0], b, BUFFER_SIZE);
if (len == 0) {
break;
} else if (len < 0) {
if (errno != EAGAIN) {
debug(1, _(L"An error occured while reading output from code block"));
wperror(L"read_try");
}
break;
} else {
buff->append(b, len);
}
} }
} }
return buff;
} }
// Return control of the terminal to a job's process group. restore_attrs is true if we are restoring // Return control of the terminal to a job's process group. restore_attrs is true if we are restoring
@ -1154,26 +1092,27 @@ void job_t::continue_job(bool send_sigcont) {
// Wait for data to become available or the status of our own job to change // Wait for data to become available or the status of our own job to change
while (!reader_exit_forced() && !is_stopped() && !is_completed()) { while (!reader_exit_forced() && !is_stopped() && !is_completed()) {
auto result = select_try(this);
read_attempted = true; read_attempted = true;
auto read_result = block_receive_try_t::NO_BUFFER;
switch (result) { if (auto buff = last_buffer(this)) {
case select_try_t::DATA_READY: const unsigned long SELECT_TIMEOUT_USEC = 10000;
// Read the data that we know is now available, then scan for finished processes bool did_read = buff->try_read(SELECT_TIMEOUT_USEC);
// but do not block. We don't block so long as we have IO to process, once the read_result = did_read ? block_receive_try_t::DATA_READ : block_receive_try_t::TIMEOUT;
// fd buffers are empty we'll block in the second case below. }
read_try(this); switch (read_result) {
case block_receive_try_t::DATA_READ:
// We read some data.
process_mark_finished_children(false); process_mark_finished_children(false);
break; break;
case select_try_t::TIMEOUT: case block_receive_try_t::TIMEOUT:
// No FDs are ready. Look for finished processes instead. // We read some data or timed out. Poll for finished processes.
debug(4, L"select_try: no fds returned valid data within the timeout" ); debug(4, L"select_try: no fds returned valid data within the timeout" );
process_mark_finished_children(block_on_fg); process_mark_finished_children(block_on_fg);
break; break;
case select_try_t::IOCHAIN_EMPTY: case block_receive_try_t::NO_BUFFER:
// There were no IO fds to select on. // We are not populating a buffer.
debug(4, L"select_try: no IO fds" ); debug(4, L"select_try: no IO fds" );
process_mark_finished_children(true); process_mark_finished_children(true);
@ -1195,7 +1134,9 @@ void job_t::continue_job(bool send_sigcont) {
// `echo` calls were sometimes having their output combined with the `set_color` calls // `echo` calls were sometimes having their output combined with the `set_color` calls
// in the wrong order! // in the wrong order!
if (!read_attempted) { if (!read_attempted) {
read_try(this); if (auto buff = last_buffer(this)) {
buff->read_to_wouldblock();
}
} }
// Set $status only if we are in the foreground and the last process in the job has // Set $status only if we are in the foreground and the last process in the job has

View file

@ -73,15 +73,17 @@ maybe_t<dup2_list_t> dup2_list_t::resolve_chain(const io_chain_t &io_chain) {
break; break;
} }
case io_mode_t::buffer:
case io_mode_t::pipe: { case io_mode_t::pipe: {
const io_pipe_t *io = static_cast<const io_pipe_t *>(io_ref.get()); const io_pipe_t *io = static_cast<const io_pipe_t *>(io_ref.get());
// If write_pipe_idx is 0, it means we're connecting to the read end (first pipe result.add_dup2(io->pipe_fd(), io->fd);
// fd). If it's 1, we're connecting to the write end (second pipe fd). result.add_close(io->pipe_fd());
unsigned int write_pipe_idx = (io->is_input ? 0 : 1); break;
result.add_dup2(io->pipe_fd[write_pipe_idx], io->fd); }
if (io->pipe_fd[0] >= 0) result.add_close(io->pipe_fd[0]);
if (io->pipe_fd[1] >= 0) result.add_close(io->pipe_fd[1]); case io_mode_t::bufferfill: {
const io_bufferfill_t *io = static_cast<const io_bufferfill_t *>(io_ref.get());
result.add_dup2(io->write_fd(), io->fd);
result.add_close(io->write_fd());
break; break;
} }
} }