builtins to write to buffers directly

This concerns builtins writing to an io_buffer_t. io_buffer_t is how fish
captures output, especially in command substitutions:

    set STUFF (string upper stuff)

Recall that io_buffer_t fills itself by reading from an fd (typically
connected to stdout of the command). However if our command is a builtin,
then we can write to the buffer directly.

Prior to this change, when a builtin anticipated writing to an
io_buffer_t, it would first write into an internal buffer, and then after
the builtin was finished, we would copy it to the io_buffer_t. This was
because we didn't have a polymorphic receiver for builtin output: we
always buffered it and then directed it to the io_buffer_t or file
descriptor or stdout or whatever.

Now that we have polymorphpic io_streams_t, we can notice ahead of time
that the builtin output is destined for an internal buffer and have it
just write directly to that buffer. This saves a buffering step, which is
a nice simplification.
This commit is contained in:
ridiculousfish 2021-02-04 15:18:34 -08:00
parent cd9a035f02
commit 7d494eab5c
6 changed files with 65 additions and 108 deletions

View file

@ -64,6 +64,8 @@
struct termios shell_modes;
const wcstring g_empty_string{};
/// This allows us to notice when we've forked.
static relaxed_atomic_bool_t is_forked_proc{false};
/// This allows us to bypass the main thread checks
@ -409,6 +411,7 @@ wcstring str2wcstring(const std::string &in, size_t len) {
std::string wcs2string(const wcstring &input) { return wcs2string(input.data(), input.size()); }
std::string wcs2string(const wchar_t *in, size_t len) {
if (len == 0) return std::string{};
std::string result;
result.reserve(len);
wcs2string_callback(in, len, [&](const char *buff, size_t bufflen) {

View file

@ -201,6 +201,10 @@ extern const wchar_t *program_name;
/// Set to false if it's been determined we can't trust the last modified timestamp on the tty.
extern const bool has_working_tty_timestamps;
/// A global, empty string. This is useful for functions which wish to return a reference to an
/// empty string.
extern const wcstring g_empty_string;
// Pause for input, then exit the program. If supported, print a backtrace first.
// The `return` will never be run but silences oclint warnings. Especially when this is called
// from within a `switch` block. As of the time I'm writing this oclint doesn't recognize the

View file

@ -441,8 +441,7 @@ static launch_result_t exec_internal_builtin_proc(parser_t &parser, process_t *p
/// \return an newly allocated output stream for the given fd, which is typically stdout or stderr.
/// This inspects the io_chain and decides what sort of output stream to return.
static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(size_t read_limit,
const io_chain_t &io_chain,
static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(const io_chain_t &io_chain,
int fd) {
const shared_ptr<const io_data_t> io = io_chain.io_for_fd(fd);
if (io == nullptr) {
@ -451,9 +450,13 @@ static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(size_t
return make_unique<fd_output_stream_t>(fd);
}
switch (io->io_mode) {
case io_mode_t::bufferfill:
// Write to a buffer.
return make_unique<buffered_output_stream_t>(read_limit);
case io_mode_t::bufferfill: {
// Our IO redirection is to an internal buffer, e.g. a command substitution.
// We will write directly to it.
std::shared_ptr<io_buffer_t> buffer =
dynamic_cast<const io_bufferfill_t *>(io.get())->buffer();
return make_unique<buffered_output_stream_t>(buffer);
}
case io_mode_t::close:
return make_unique<null_output_stream_t>();
@ -462,7 +465,7 @@ static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(size_t
case io_mode_t::file:
case io_mode_t::pipe:
case io_mode_t::fd:
return make_unique<buffered_output_stream_t>(read_limit);
return make_unique<string_output_stream_t>();
}
DIE("Unreachable");
}
@ -470,75 +473,17 @@ static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(size_t
/// Handle output from a builtin, by printing the contents of builtin_io_streams to the redirections
/// given in io_chain.
static void handle_builtin_output(parser_t &parser, const std::shared_ptr<job_t> &j, process_t *p,
io_chain_t *io_chain, const io_streams_t &builtin_io_streams) {
io_chain_t *io_chain, const io_streams_t &streams) {
assert(p->type == process_type_t::builtin && "Process is not a builtin");
const separated_buffer_t<wcstring> *output_buffer =
builtin_io_streams.out.get_separated_buffer();
const separated_buffer_t<wcstring> *errput_buffer =
builtin_io_streams.err.get_separated_buffer();
// Mark if we discarded output.
if (output_buffer && output_buffer->discarded()) {
if (streams.out.discarded() || streams.err.discarded()) {
p->status = proc_status_t::from_exit_code(STATUS_READ_TOO_MUCH);
}
const shared_ptr<const io_data_t> stdout_io = io_chain->io_for_fd(STDOUT_FILENO);
const shared_ptr<const io_data_t> stderr_io = io_chain->io_for_fd(STDERR_FILENO);
// If we are directing output to a buffer, then we can just transfer it directly without needing
// to write to the bufferfill pipe. Note this is how we handle explicitly separated stdout
// output (i.e. string split0) which can't really be sent through a pipe.
bool stdout_done = false;
if (output_buffer && stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) {
auto stdout_buffer = dynamic_cast<const io_bufferfill_t *>(stdout_io.get())->buffer();
stdout_buffer->append_from_wide_buffer(*output_buffer);
stdout_done = true;
}
bool stderr_done = false;
if (errput_buffer && stderr_io && stderr_io->io_mode == io_mode_t::bufferfill) {
auto stderr_buffer = dynamic_cast<const io_bufferfill_t *>(stderr_io.get())->buffer();
stderr_buffer->append_from_wide_buffer(*errput_buffer);
stderr_done = true;
}
// Figure out any data remaining to write. We may have none in which case we can short-circuit.
std::string outbuff;
if (output_buffer && !stdout_done) {
outbuff = wcs2string(output_buffer->newline_serialized());
}
std::string errbuff;
if (errput_buffer && !stderr_done) {
errbuff = wcs2string(errput_buffer->newline_serialized());
}
// If we have no redirections for stdout/stderr, just write them directly.
if (!stdout_io && !stderr_io) {
bool did_err = false;
if (write_loop(STDOUT_FILENO, outbuff.data(), outbuff.size()) < 0) {
if (errno != EPIPE) {
did_err = true;
FLOG(error, L"Error while writing to stdout");
wperror(L"write_loop");
}
}
if (write_loop(STDERR_FILENO, errbuff.data(), errbuff.size()) < 0) {
if (errno != EPIPE && !did_err) {
did_err = true;
FLOG(error, L"Error while writing to stderr");
wperror(L"write_loop");
}
}
if (did_err) {
redirect_tty_output(); // workaround glibc bug
FLOG(error, L"!builtin_io_done and errno != EPIPE");
show_stackframe(L'E');
}
// Clear the buffers to indicate we finished.
outbuff.clear();
errbuff.clear();
}
// Figure out any data remaining to write. We may have none, in which case we can short-circuit.
std::string outbuff = wcs2string(streams.out.contents());
std::string errbuff = wcs2string(streams.err.contents());
// Some historical behavior.
if (!outbuff.empty()) fflush(stdout);
@ -863,11 +808,10 @@ static launch_result_t exec_process_in_job(parser_t &parser, process_t *p,
}
case process_type_t::builtin: {
size_t read_limit = parser.libdata().read_limit;
std::unique_ptr<output_stream_t> output_stream =
create_output_stream_for_builtin(read_limit, process_net_io_chain, STDOUT_FILENO);
create_output_stream_for_builtin(process_net_io_chain, STDOUT_FILENO);
std::unique_ptr<output_stream_t> errput_stream =
create_output_stream_for_builtin(read_limit, process_net_io_chain, STDERR_FILENO);
create_output_stream_for_builtin(process_net_io_chain, STDERR_FILENO);
io_streams_t builtin_io_streams{*output_stream, *errput_stream};
builtin_io_streams.job_group = j->group;

View file

@ -5366,7 +5366,7 @@ maybe_t<int> builtin_string(parser_t &parser, io_streams_t &streams, wchar_t **a
static void run_one_string_test(const wchar_t *const *argv, int expected_rc,
const wchar_t *expected_out) {
parser_t &parser = parser_t::principal_parser();
buffered_output_stream_t outs{0};
string_output_stream_t outs{};
null_output_stream_t errs{};
io_streams_t streams(outs, errs);
streams.stdin_is_directly_redirected = false; // read from argv instead of stdin

View file

@ -57,17 +57,6 @@ void io_bufferfill_t::print() const {
std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd);
}
void io_buffer_t::append_from_wide_buffer(const separated_buffer_t<wcstring> &input) {
if (input.elements().empty() && !input.discarded()) return;
scoped_lock locker(append_lock_);
if (buffer_.discarded()) return;
if (input.discarded()) {
buffer_.set_discard();
return;
}
buffer_.append_wide_buffer(input);
}
ssize_t io_buffer_t::read_once(int fd) {
assert(fd >= 0 && "Invalid fd");
ASSERT_IS_LOCKED(append_lock_);
@ -361,6 +350,8 @@ void output_stream_t::append_with_separation(const wchar_t *s, size_t len, separ
}
}
const wcstring &output_stream_t::contents() const { return g_empty_string; }
void fd_output_stream_t::append(const wchar_t *s, size_t amt) {
if (errored_) return;
int res = wwrite_to_fd(s, amt, this->fd_);
@ -375,9 +366,15 @@ void null_output_stream_t::append(const wchar_t *, size_t) {}
void string_output_stream_t::append(const wchar_t *s, size_t amt) { contents_.append(s, amt); }
void buffered_output_stream_t::append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); }
const wcstring &string_output_stream_t::contents() const { return contents_; }
void buffered_output_stream_t::append(const wchar_t *s, size_t amt) {
buffer_->append(wcs2string(s, amt));
}
void buffered_output_stream_t::append_with_separation(const wchar_t *s, size_t len,
separation_type_t type) {
buffer_.append(s, s + len, type);
buffer_->append(wcs2string(s, len), type);
}
bool buffered_output_stream_t::discarded() const { return buffer_->discarded(); }

View file

@ -318,8 +318,8 @@ class io_buffer_t {
/// The item id of our background fillthread fd monitor item.
uint64_t item_id_{0};
/// Lock for appending.
std::mutex append_lock_{};
/// Lock for appending. Mutable since we take it in const functions.
mutable std::mutex append_lock_{};
/// Read some, filling the buffer. The append lock must be held.
/// \return positive on success, 0 if closed, -1 on error (in which case errno will be set).
@ -347,14 +347,22 @@ class io_buffer_t {
}
/// Function to append to the buffer.
void append(const char *ptr, size_t count) {
void append(const char *ptr, size_t count,
separation_type_t type = separation_type_t::inferred) {
scoped_lock locker(append_lock_);
buffer_.append(ptr, ptr + count);
buffer_.append(ptr, ptr + count, type);
}
/// Appends data from a given separated buffer.
/// Marks the receiver as discarded if the buffer was discarded.
void append_from_wide_buffer(const separated_buffer_t<wcstring> &input);
/// \return true if output was discarded due to exceeding the read limit.
bool discarded() const {
scoped_lock locker(append_lock_);
return buffer_.discarded();
}
void append(std::string &&str, separation_type_t type = separation_type_t::inferred) {
scoped_lock locker(append_lock_);
buffer_.append(std::move(str), type);
}
};
using io_data_ref_t = std::shared_ptr<const io_data_t>;
@ -413,8 +421,13 @@ class output_stream_t {
/// Required override point. The output stream receives a string \p s with \p amt chars.
virtual void append(const wchar_t *s, size_t amt) = 0;
/// \return the separated buffer if this holds one, otherwise nullptr.
virtual const separated_buffer_t<wcstring> *get_separated_buffer() const { return nullptr; }
/// \return true if output was discarded. This only applies to buffered output streams.
virtual bool discarded() const { return false; }
/// \return any internally buffered contents.
/// This is only implemented for a string_output_stream; others flush data to their underlying
/// receiver (fd, or separated buffer) immediately and so will return an empty string here.
virtual const wcstring &contents() const;
/// An optional override point. This is for explicit separation.
virtual void append_with_separation(const wchar_t *s, size_t len, separation_type_t type);
@ -482,31 +495,27 @@ class string_output_stream_t final : public output_stream_t {
void append(const wchar_t *s, size_t amt) override;
/// \return the wcstring containing the output.
const wcstring &contents() const { return contents_; }
const wcstring &contents() const override;
private:
wcstring contents_;
};
/// An output stream for builtins which buffers into a separated buffer.
/// An output stream for builtins which writes into a separated buffer.
class buffered_output_stream_t final : public output_stream_t {
public:
explicit buffered_output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {}
explicit buffered_output_stream_t(std::shared_ptr<io_buffer_t> buffer)
: buffer_(std::move(buffer)) {
assert(buffer_ && "Buffer must not be null");
}
void append(const wchar_t *s, size_t amt) override;
void append_with_separation(const wchar_t *s, size_t len, separation_type_t type) override;
wcstring contents() const { return buffer_.newline_serialized(); }
/// Access the buffer.
separated_buffer_t<wcstring> &buffer() { return buffer_; }
const separated_buffer_t<wcstring> &buffer() const { return buffer_; }
const separated_buffer_t<wcstring> *get_separated_buffer() const override { return &buffer_; }
bool discarded() const override;
private:
/// Storage for our data.
separated_buffer_t<wcstring> buffer_;
/// The buffer we are filling.
std::shared_ptr<io_buffer_t> buffer_;
};
struct io_streams_t {