Do not buffer builtin output if avoidable

builtins output to stdout and stderr via io_streams_t. Prior to this fix, it
contained an output_stream_t which just wraps a buffer. So all builtin output
went to this buffer (except for eval).

Switch output_stream_t to become a new abstract class which can output to a
buffer, file descriptor, or nowhere. This allows for example `string` to stream
its output as it is produced, instead of buffering it.
This commit is contained in:
ridiculousfish 2020-07-29 16:03:29 -07:00
parent 68092c5d21
commit bcfc54fdaa
7 changed files with 174 additions and 51 deletions

View file

@ -20,6 +20,7 @@ Notable improvements and fixes
- A new ``fish_add_path`` helper function to add paths to $PATH without producing duplicates, to be used interactively or in ``config.fish`` (#6960).
- ``fish_preexec`` and ``fish_postexec`` events are no longer triggered for empty commands.
- The ``test`` builtin now better shows where an error occured (#6030).
- builtins may now output before all data is read. For example, `string replace` no longer has to read all of stdin before it can begin to output.
Syntax changes and new commands
-------------------------------

View file

@ -1185,7 +1185,6 @@ static int string_split_maybe0(parser_t &parser, io_streams_t &streams, int argc
// Remove the last element if it is empty.
if (splits.back().empty()) splits.pop_back();
}
auto &buff = streams.out.buffer();
if (opts.fields.size() > 0) {
// Print nothing and return error if any of the supplied
// fields do not exist, unless `--allow-empty` is used.
@ -1199,12 +1198,13 @@ static int string_split_maybe0(parser_t &parser, io_streams_t &streams, int argc
}
for (const auto &field : opts.fields) {
if (field - 1 < (long)splits.size()) {
buff.append(splits.at(field - 1), separation_type_t::explicitly);
streams.out.append_with_separation(splits.at(field - 1),
separation_type_t::explicitly);
}
}
} else {
for (const wcstring &split : splits) {
buff.append(split, separation_type_t::explicitly);
streams.out.append_with_separation(split, separation_type_t::explicitly);
}
}
}
@ -1228,20 +1228,21 @@ static int string_collect(parser_t &parser, io_streams_t &streams, int argc, wch
int retval = parse_opts(&opts, &optind, 0, argc, argv, parser, streams);
if (retval != STATUS_CMD_OK) return retval;
auto &buff = streams.out.buffer();
arg_iterator_t aiter(argv, optind, streams, /* don't split */ false);
size_t appended = 0;
while (const wcstring *arg = aiter.nextstr()) {
auto begin = arg->cbegin(), end = arg->cend();
const wchar_t *s = arg->c_str();
size_t len = arg->size();
if (!opts.no_trim_newlines) {
while (end > begin && *(end - 1) == L'\n') {
--end;
while (len > 0 && s[len - 1] == L'\n') {
len -= 1;
}
}
buff.append(begin, end, separation_type_t::explicitly);
streams.out.append_with_separation(s, len, separation_type_t::explicitly);
appended += len;
}
return buff.size() > 0 ? STATUS_CMD_OK : STATUS_CMD_ERROR;
return appended > 0 ? STATUS_CMD_OK : STATUS_CMD_ERROR;
}
// Helper function to abstract the repeat logic from string_repeat

View file

@ -419,17 +419,47 @@ static bool exec_internal_builtin_proc(parser_t &parser, process_t *p, const io_
return true; // "success"
}
/// \return an newly allocated output stream for the given fd, which is typically stdout or stderr.
/// This inspects the io_chain and decides what sort of output stream to return.
static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(const parser_t &parser,
const io_chain_t &io_chain,
int fd) {
const shared_ptr<const io_data_t> io = io_chain.io_for_fd(fd);
if (io == nullptr) {
// Common case of no redirections.
// Just write to the fd directly.
return make_unique<fd_output_stream_t>(fd);
}
switch (io->io_mode) {
case io_mode_t::bufferfill:
// Write to a buffer.
return make_unique<buffered_output_stream_t>(parser.libdata().read_limit);
case io_mode_t::close:
return make_unique<null_output_stream_t>();
// TODO: reconsider these.
case io_mode_t::file:
case io_mode_t::pipe:
case io_mode_t::fd:
return make_unique<buffered_output_stream_t>(parser.libdata().read_limit);
}
DIE("Unreachable");
}
/// Handle output from a builtin, by printing the contents of builtin_io_streams to the redirections
/// given in io_chain.
static bool handle_builtin_output(parser_t &parser, const std::shared_ptr<job_t> &j, process_t *p,
io_chain_t *io_chain, const io_streams_t &builtin_io_streams) {
assert(p->type == process_type_t::builtin && "Process is not a builtin");
const output_stream_t &stdout_stream = builtin_io_streams.out;
const output_stream_t &stderr_stream = builtin_io_streams.err;
const separated_buffer_t<wcstring> *output_buffer =
builtin_io_streams.out.get_separated_buffer();
const separated_buffer_t<wcstring> *errput_buffer =
builtin_io_streams.err.get_separated_buffer();
// Mark if we discarded output.
if (stdout_stream.buffer().discarded()) {
if (output_buffer && output_buffer->discarded()) {
p->status = proc_status_t::from_exit_code(STATUS_READ_TOO_MUCH);
}
@ -446,15 +476,21 @@ static bool handle_builtin_output(parser_t &parser, const std::shared_ptr<job_t>
// doesn't also produce stderr. Also note that we never send stderr to a buffer, so there's no
// need for a similar check for stderr.
bool stdout_done = false;
if (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) {
if (output_buffer && stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) {
auto stdout_buffer = dynamic_cast<const io_bufferfill_t *>(stdout_io.get())->buffer();
stdout_buffer->append_from_stream(stdout_stream);
stdout_buffer->append_from_wide_buffer(*output_buffer);
stdout_done = true;
}
// Figure out any data remaining to write. We may have none in which case we can short-circuit.
std::string outbuff = stdout_done ? std::string{} : wcs2string(stdout_stream.contents());
std::string errbuff = wcs2string(stderr_stream.contents());
std::string outbuff;
if (output_buffer && !stdout_done) {
outbuff = wcs2string(output_buffer->newline_serialized());
}
std::string errbuff;
if (errput_buffer) {
errbuff = wcs2string(errput_buffer->newline_serialized());
}
// If we have no redirections for stdout/stderr, just write them directly.
if (!stdout_io && !stderr_io) {
@ -819,7 +855,11 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, const std::share
}
case process_type_t::builtin: {
io_streams_t builtin_io_streams{parser.libdata().read_limit};
std::unique_ptr<output_stream_t> output_stream =
create_output_stream_for_builtin(parser, process_net_io_chain, STDOUT_FILENO);
std::unique_ptr<output_stream_t> errput_stream =
create_output_stream_for_builtin(parser, process_net_io_chain, STDERR_FILENO);
io_streams_t builtin_io_streams{*output_stream, *errput_stream};
builtin_io_streams.job_group = j->group;
if (!exec_internal_builtin_proc(parser, p, pipe_read.get(), process_net_io_chain,
builtin_io_streams)) {

View file

@ -2533,7 +2533,8 @@ static bool run_one_test_test(int expected, wcstring_list_t &lst, bool bracket)
i++;
}
argv[i + 1] = NULL;
io_streams_t streams(0);
null_output_stream_t null{};
io_streams_t streams(null, null);
int result = builtin_test(parser, streams, argv);
if (expected != result) err(L"expected builtin_test() to return %d, got %d", expected, result);
@ -2565,7 +2566,8 @@ static bool run_test_test(int expected, const wcstring &str) {
static void test_test_brackets() {
// Ensure [ knows it needs a ].
parser_t &parser = parser_t::principal_parser();
io_streams_t streams(0);
null_output_stream_t null{};
io_streams_t streams(null, null);
null_terminated_array_t<wchar_t> args;
@ -5100,7 +5102,9 @@ int builtin_string(parser_t &parser, io_streams_t &streams, wchar_t **argv);
static void run_one_string_test(const wchar_t *const *argv, int expected_rc,
const wchar_t *expected_out) {
parser_t &parser = parser_t::principal_parser();
io_streams_t streams(0);
buffered_output_stream_t outs{0};
null_output_stream_t errs{};
io_streams_t streams(outs, errs);
streams.stdin_is_directly_redirected = false; // read from argv instead of stdin
int rc = builtin_string(parser, streams, const_cast<wchar_t **>(argv));
wcstring args;
@ -5111,10 +5115,10 @@ static void run_one_string_test(const wchar_t *const *argv, int expected_rc,
if (rc != expected_rc) {
err(L"Test failed on line %lu: [%ls]: expected return code %d but got %d", __LINE__,
args.c_str(), expected_rc, rc);
} else if (streams.out.contents() != expected_out) {
} else if (outs.contents() != expected_out) {
err(L"Test failed on line %lu: [%ls]: expected [%ls] but got [%ls]", __LINE__, args.c_str(),
escape_string(expected_out, ESCAPE_ALL).c_str(),
escape_string(streams.out.contents(), ESCAPE_ALL).c_str());
escape_string(outs.contents(), ESCAPE_ALL).c_str());
}
}

View file

@ -57,8 +57,7 @@ void io_bufferfill_t::print() const {
std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd);
}
void io_buffer_t::append_from_stream(const output_stream_t &stream) {
const separated_buffer_t<wcstring> &input = stream.buffer();
void io_buffer_t::append_from_wide_buffer(const separated_buffer_t<wcstring> &input) {
if (input.elements().empty() && !input.discarded()) return;
scoped_lock locker(append_lock_);
if (buffer_.discarded()) return;
@ -349,6 +348,32 @@ shared_ptr<const io_data_t> io_chain_t::io_for_fd(int fd) const {
void output_stream_t::append_narrow_buffer(const separated_buffer_t<std::string> &buffer) {
for (const auto &rhs_elem : buffer.elements()) {
buffer_.append(str2wcstring(rhs_elem.contents), rhs_elem.separation);
append_with_separation(str2wcstring(rhs_elem.contents), rhs_elem.separation);
}
}
void output_stream_t::append_with_separation(const wchar_t *s, size_t len, separation_type_t type) {
append(s, len);
if (type == separation_type_t::explicitly) {
append(L'\n');
}
}
void fd_output_stream_t::append(const wchar_t *s, size_t amt) {
if (errored_) return;
int res = wwrite_to_fd(s, amt, this->fd_);
if (res < 0) {
// TODO: this error is too aggressive, e.g. if we got SIGINT we should not complain.
wperror(L"write");
errored_ = true;
}
}
void null_output_stream_t::append(const wchar_t *, size_t) {}
void buffered_output_stream_t::append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); }
void buffered_output_stream_t::append_with_separation(const wchar_t *s, size_t len,
separation_type_t type) {
buffer_.append(s, s + len, type);
}

View file

@ -344,9 +344,9 @@ class io_buffer_t {
buffer_.append(ptr, ptr + count);
}
/// Appends data from a given output_stream_t.
/// Marks the receiver as discarded if the stream was discarded.
void append_from_stream(const output_stream_t &stream);
/// Appends data from a given separated buffer.
/// Marks the receiver as discarded if the buffer was discarded.
void append_from_wide_buffer(const separated_buffer_t<wcstring> &input);
};
using io_data_ref_t = std::shared_ptr<const io_data_t>;
@ -398,36 +398,36 @@ maybe_t<autoclose_pipes_t> make_autoclose_pipes(const fd_set_t &fdset);
/// cloexec. \returns invalid fd on failure (in which case the given fd is still closed).
autoclose_fd_t move_fd_to_unused(autoclose_fd_t fd, const fd_set_t &fdset);
/// Class representing the output that a builtin can generate.
/// Base class representing the output that a builtin can generate.
/// This has various subclasses depending on the ultimate output destination.
class output_stream_t {
private:
/// Storage for our data.
separated_buffer_t<wcstring> buffer_;
// No copying.
output_stream_t(const output_stream_t &s) = delete;
void operator=(const output_stream_t &s) = delete;
public:
output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {}
/// Required override point. The output stream receives a string \p s with \p amt chars.
virtual void append(const wchar_t *s, size_t amt) = 0;
void append(const wcstring &s) { buffer_.append(s.begin(), s.end()); }
/// \return the separated buffer if this holds one, otherwise nullptr.
virtual const separated_buffer_t<wcstring> *get_separated_buffer() const { return nullptr; }
separated_buffer_t<wcstring> &buffer() { return buffer_; }
/// An optional override point. This is for explicit separation.
virtual void append_with_separation(const wchar_t *s, size_t len, separation_type_t type);
const separated_buffer_t<wcstring> &buffer() const { return buffer_; }
/// The following are all convenience overrides.
void append_with_separation(const wcstring &s, separation_type_t type) {
append_with_separation(s.data(), s.size(), type);
}
/// Append a string.
void append(const wcstring &s) { append(s.data(), s.size()); }
void append(const wchar_t *s) { append(s, std::wcslen(s)); }
/// Append a char.
void append(wchar_t s) { append(&s, 1); }
void append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); }
void push_back(wchar_t c) { append(c); }
// Append data from a narrow buffer, widening it.
void append_narrow_buffer(const separated_buffer_t<std::string> &buffer);
void push_back(wchar_t c) { append(c); }
/// Append a format string.
void append_format(const wchar_t *format, ...) {
va_list va;
va_start(va, format);
@ -437,12 +437,61 @@ class output_stream_t {
void append_formatv(const wchar_t *format, va_list va) { append(vformat_string(format, va)); }
// No copying.
output_stream_t(const output_stream_t &s) = delete;
void operator=(const output_stream_t &s) = delete;
output_stream_t() = default;
virtual ~output_stream_t() = default;
};
/// A null output stream which ignores all writes.
class null_output_stream_t final : public output_stream_t {
virtual void append(const wchar_t *s, size_t amt) override;
};
/// An output stream for builtins which outputs to an fd.
/// Note the fd may be something like stdout; there is no ownership implied here.
class fd_output_stream_t final : public output_stream_t {
public:
/// Construct from a file descriptor, which must be nonegative.
explicit fd_output_stream_t(int fd) : fd_(fd) { assert(fd_ >= 0 && "Invalid fd"); }
void append(const wchar_t *s, size_t amt) override;
private:
/// The file descriptor to write to.
const int fd_;
/// Whether we have received an error.
bool errored_{false};
};
/// An output stream for builtins which buffers into a separated buffer.
class buffered_output_stream_t final : public output_stream_t {
public:
explicit buffered_output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {}
void append(const wchar_t *s, size_t amt) override;
void append_with_separation(const wchar_t *s, size_t len, separation_type_t type) override;
wcstring contents() const { return buffer_.newline_serialized(); }
/// Access the buffer.
separated_buffer_t<wcstring> &buffer() { return buffer_; }
const separated_buffer_t<wcstring> &buffer() const { return buffer_; }
const separated_buffer_t<wcstring> *get_separated_buffer() const override { return &buffer_; }
private:
/// Storage for our data.
separated_buffer_t<wcstring> buffer_;
};
struct io_streams_t {
output_stream_t out;
output_stream_t err;
// Streams for out and err.
output_stream_t &out;
output_stream_t &err;
// fd representing stdin. This is not closed by the destructor.
int stdin_fd{-1};
@ -473,7 +522,7 @@ struct io_streams_t {
io_streams_t(const io_streams_t &) = delete;
void operator=(const io_streams_t &) = delete;
explicit io_streams_t(size_t read_limit) : out(read_limit), err(read_limit), stdin_fd(-1) {}
io_streams_t(output_stream_t &out, output_stream_t &err) : out(out), err(err) {}
};
#endif

View file

@ -388,11 +388,14 @@ end_execution_reason_t parse_execution_context_t::run_function_statement(
return result;
}
trace_if_enabled(*parser, L"function", arguments);
io_streams_t streams(0); // no limit on the amount of output from builtin_function()
// no limit on the amount of output from builtin_function()
buffered_output_stream_t outs(0);
buffered_output_stream_t errs(0);
io_streams_t streams(outs, errs);
int err = builtin_function(*parser, streams, arguments, pstree, statement);
parser->set_last_statuses(statuses_t::just(err));
wcstring errtext = streams.err.contents();
wcstring errtext = errs.contents();
if (!errtext.empty()) {
return this->report_error(err, header, L"%ls", errtext.c_str());
}