mirror of
https://github.com/fish-shell/fish-shell
synced 2025-01-01 07:38:46 +00:00
Do not buffer builtin output if avoidable
builtins output to stdout and stderr via io_streams_t. Prior to this fix, it contained an output_stream_t which just wraps a buffer. So all builtin output went to this buffer (except for eval). Switch output_stream_t to become a new abstract class which can output to a buffer, file descriptor, or nowhere. This allows for example `string` to stream its output as it is produced, instead of buffering it.
This commit is contained in:
parent
68092c5d21
commit
bcfc54fdaa
7 changed files with 174 additions and 51 deletions
|
@ -20,6 +20,7 @@ Notable improvements and fixes
|
|||
- A new ``fish_add_path`` helper function to add paths to $PATH without producing duplicates, to be used interactively or in ``config.fish`` (#6960).
|
||||
- ``fish_preexec`` and ``fish_postexec`` events are no longer triggered for empty commands.
|
||||
- The ``test`` builtin now better shows where an error occured (#6030).
|
||||
- builtins may now output before all data is read. For example, `string replace` no longer has to read all of stdin before it can begin to output.
|
||||
|
||||
Syntax changes and new commands
|
||||
-------------------------------
|
||||
|
|
|
@ -1185,7 +1185,6 @@ static int string_split_maybe0(parser_t &parser, io_streams_t &streams, int argc
|
|||
// Remove the last element if it is empty.
|
||||
if (splits.back().empty()) splits.pop_back();
|
||||
}
|
||||
auto &buff = streams.out.buffer();
|
||||
if (opts.fields.size() > 0) {
|
||||
// Print nothing and return error if any of the supplied
|
||||
// fields do not exist, unless `--allow-empty` is used.
|
||||
|
@ -1199,12 +1198,13 @@ static int string_split_maybe0(parser_t &parser, io_streams_t &streams, int argc
|
|||
}
|
||||
for (const auto &field : opts.fields) {
|
||||
if (field - 1 < (long)splits.size()) {
|
||||
buff.append(splits.at(field - 1), separation_type_t::explicitly);
|
||||
streams.out.append_with_separation(splits.at(field - 1),
|
||||
separation_type_t::explicitly);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const wcstring &split : splits) {
|
||||
buff.append(split, separation_type_t::explicitly);
|
||||
streams.out.append_with_separation(split, separation_type_t::explicitly);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1228,20 +1228,21 @@ static int string_collect(parser_t &parser, io_streams_t &streams, int argc, wch
|
|||
int retval = parse_opts(&opts, &optind, 0, argc, argv, parser, streams);
|
||||
if (retval != STATUS_CMD_OK) return retval;
|
||||
|
||||
auto &buff = streams.out.buffer();
|
||||
arg_iterator_t aiter(argv, optind, streams, /* don't split */ false);
|
||||
size_t appended = 0;
|
||||
while (const wcstring *arg = aiter.nextstr()) {
|
||||
auto begin = arg->cbegin(), end = arg->cend();
|
||||
const wchar_t *s = arg->c_str();
|
||||
size_t len = arg->size();
|
||||
if (!opts.no_trim_newlines) {
|
||||
while (end > begin && *(end - 1) == L'\n') {
|
||||
--end;
|
||||
while (len > 0 && s[len - 1] == L'\n') {
|
||||
len -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
buff.append(begin, end, separation_type_t::explicitly);
|
||||
streams.out.append_with_separation(s, len, separation_type_t::explicitly);
|
||||
appended += len;
|
||||
}
|
||||
|
||||
return buff.size() > 0 ? STATUS_CMD_OK : STATUS_CMD_ERROR;
|
||||
return appended > 0 ? STATUS_CMD_OK : STATUS_CMD_ERROR;
|
||||
}
|
||||
|
||||
// Helper function to abstract the repeat logic from string_repeat
|
||||
|
|
56
src/exec.cpp
56
src/exec.cpp
|
@ -419,17 +419,47 @@ static bool exec_internal_builtin_proc(parser_t &parser, process_t *p, const io_
|
|||
return true; // "success"
|
||||
}
|
||||
|
||||
/// \return an newly allocated output stream for the given fd, which is typically stdout or stderr.
|
||||
/// This inspects the io_chain and decides what sort of output stream to return.
|
||||
static std::unique_ptr<output_stream_t> create_output_stream_for_builtin(const parser_t &parser,
|
||||
const io_chain_t &io_chain,
|
||||
int fd) {
|
||||
const shared_ptr<const io_data_t> io = io_chain.io_for_fd(fd);
|
||||
if (io == nullptr) {
|
||||
// Common case of no redirections.
|
||||
// Just write to the fd directly.
|
||||
return make_unique<fd_output_stream_t>(fd);
|
||||
}
|
||||
switch (io->io_mode) {
|
||||
case io_mode_t::bufferfill:
|
||||
// Write to a buffer.
|
||||
return make_unique<buffered_output_stream_t>(parser.libdata().read_limit);
|
||||
|
||||
case io_mode_t::close:
|
||||
return make_unique<null_output_stream_t>();
|
||||
|
||||
// TODO: reconsider these.
|
||||
case io_mode_t::file:
|
||||
case io_mode_t::pipe:
|
||||
case io_mode_t::fd:
|
||||
return make_unique<buffered_output_stream_t>(parser.libdata().read_limit);
|
||||
}
|
||||
DIE("Unreachable");
|
||||
}
|
||||
|
||||
/// Handle output from a builtin, by printing the contents of builtin_io_streams to the redirections
|
||||
/// given in io_chain.
|
||||
static bool handle_builtin_output(parser_t &parser, const std::shared_ptr<job_t> &j, process_t *p,
|
||||
io_chain_t *io_chain, const io_streams_t &builtin_io_streams) {
|
||||
assert(p->type == process_type_t::builtin && "Process is not a builtin");
|
||||
|
||||
const output_stream_t &stdout_stream = builtin_io_streams.out;
|
||||
const output_stream_t &stderr_stream = builtin_io_streams.err;
|
||||
const separated_buffer_t<wcstring> *output_buffer =
|
||||
builtin_io_streams.out.get_separated_buffer();
|
||||
const separated_buffer_t<wcstring> *errput_buffer =
|
||||
builtin_io_streams.err.get_separated_buffer();
|
||||
|
||||
// Mark if we discarded output.
|
||||
if (stdout_stream.buffer().discarded()) {
|
||||
if (output_buffer && output_buffer->discarded()) {
|
||||
p->status = proc_status_t::from_exit_code(STATUS_READ_TOO_MUCH);
|
||||
}
|
||||
|
||||
|
@ -446,15 +476,21 @@ static bool handle_builtin_output(parser_t &parser, const std::shared_ptr<job_t>
|
|||
// doesn't also produce stderr. Also note that we never send stderr to a buffer, so there's no
|
||||
// need for a similar check for stderr.
|
||||
bool stdout_done = false;
|
||||
if (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) {
|
||||
if (output_buffer && stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) {
|
||||
auto stdout_buffer = dynamic_cast<const io_bufferfill_t *>(stdout_io.get())->buffer();
|
||||
stdout_buffer->append_from_stream(stdout_stream);
|
||||
stdout_buffer->append_from_wide_buffer(*output_buffer);
|
||||
stdout_done = true;
|
||||
}
|
||||
|
||||
// Figure out any data remaining to write. We may have none in which case we can short-circuit.
|
||||
std::string outbuff = stdout_done ? std::string{} : wcs2string(stdout_stream.contents());
|
||||
std::string errbuff = wcs2string(stderr_stream.contents());
|
||||
std::string outbuff;
|
||||
if (output_buffer && !stdout_done) {
|
||||
outbuff = wcs2string(output_buffer->newline_serialized());
|
||||
}
|
||||
std::string errbuff;
|
||||
if (errput_buffer) {
|
||||
errbuff = wcs2string(errput_buffer->newline_serialized());
|
||||
}
|
||||
|
||||
// If we have no redirections for stdout/stderr, just write them directly.
|
||||
if (!stdout_io && !stderr_io) {
|
||||
|
@ -819,7 +855,11 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, const std::share
|
|||
}
|
||||
|
||||
case process_type_t::builtin: {
|
||||
io_streams_t builtin_io_streams{parser.libdata().read_limit};
|
||||
std::unique_ptr<output_stream_t> output_stream =
|
||||
create_output_stream_for_builtin(parser, process_net_io_chain, STDOUT_FILENO);
|
||||
std::unique_ptr<output_stream_t> errput_stream =
|
||||
create_output_stream_for_builtin(parser, process_net_io_chain, STDERR_FILENO);
|
||||
io_streams_t builtin_io_streams{*output_stream, *errput_stream};
|
||||
builtin_io_streams.job_group = j->group;
|
||||
if (!exec_internal_builtin_proc(parser, p, pipe_read.get(), process_net_io_chain,
|
||||
builtin_io_streams)) {
|
||||
|
|
|
@ -2533,7 +2533,8 @@ static bool run_one_test_test(int expected, wcstring_list_t &lst, bool bracket)
|
|||
i++;
|
||||
}
|
||||
argv[i + 1] = NULL;
|
||||
io_streams_t streams(0);
|
||||
null_output_stream_t null{};
|
||||
io_streams_t streams(null, null);
|
||||
int result = builtin_test(parser, streams, argv);
|
||||
|
||||
if (expected != result) err(L"expected builtin_test() to return %d, got %d", expected, result);
|
||||
|
@ -2565,7 +2566,8 @@ static bool run_test_test(int expected, const wcstring &str) {
|
|||
static void test_test_brackets() {
|
||||
// Ensure [ knows it needs a ].
|
||||
parser_t &parser = parser_t::principal_parser();
|
||||
io_streams_t streams(0);
|
||||
null_output_stream_t null{};
|
||||
io_streams_t streams(null, null);
|
||||
|
||||
null_terminated_array_t<wchar_t> args;
|
||||
|
||||
|
@ -5100,7 +5102,9 @@ int builtin_string(parser_t &parser, io_streams_t &streams, wchar_t **argv);
|
|||
static void run_one_string_test(const wchar_t *const *argv, int expected_rc,
|
||||
const wchar_t *expected_out) {
|
||||
parser_t &parser = parser_t::principal_parser();
|
||||
io_streams_t streams(0);
|
||||
buffered_output_stream_t outs{0};
|
||||
null_output_stream_t errs{};
|
||||
io_streams_t streams(outs, errs);
|
||||
streams.stdin_is_directly_redirected = false; // read from argv instead of stdin
|
||||
int rc = builtin_string(parser, streams, const_cast<wchar_t **>(argv));
|
||||
wcstring args;
|
||||
|
@ -5111,10 +5115,10 @@ static void run_one_string_test(const wchar_t *const *argv, int expected_rc,
|
|||
if (rc != expected_rc) {
|
||||
err(L"Test failed on line %lu: [%ls]: expected return code %d but got %d", __LINE__,
|
||||
args.c_str(), expected_rc, rc);
|
||||
} else if (streams.out.contents() != expected_out) {
|
||||
} else if (outs.contents() != expected_out) {
|
||||
err(L"Test failed on line %lu: [%ls]: expected [%ls] but got [%ls]", __LINE__, args.c_str(),
|
||||
escape_string(expected_out, ESCAPE_ALL).c_str(),
|
||||
escape_string(streams.out.contents(), ESCAPE_ALL).c_str());
|
||||
escape_string(outs.contents(), ESCAPE_ALL).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
31
src/io.cpp
31
src/io.cpp
|
@ -57,8 +57,7 @@ void io_bufferfill_t::print() const {
|
|||
std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd);
|
||||
}
|
||||
|
||||
void io_buffer_t::append_from_stream(const output_stream_t &stream) {
|
||||
const separated_buffer_t<wcstring> &input = stream.buffer();
|
||||
void io_buffer_t::append_from_wide_buffer(const separated_buffer_t<wcstring> &input) {
|
||||
if (input.elements().empty() && !input.discarded()) return;
|
||||
scoped_lock locker(append_lock_);
|
||||
if (buffer_.discarded()) return;
|
||||
|
@ -349,6 +348,32 @@ shared_ptr<const io_data_t> io_chain_t::io_for_fd(int fd) const {
|
|||
|
||||
void output_stream_t::append_narrow_buffer(const separated_buffer_t<std::string> &buffer) {
|
||||
for (const auto &rhs_elem : buffer.elements()) {
|
||||
buffer_.append(str2wcstring(rhs_elem.contents), rhs_elem.separation);
|
||||
append_with_separation(str2wcstring(rhs_elem.contents), rhs_elem.separation);
|
||||
}
|
||||
}
|
||||
|
||||
void output_stream_t::append_with_separation(const wchar_t *s, size_t len, separation_type_t type) {
|
||||
append(s, len);
|
||||
if (type == separation_type_t::explicitly) {
|
||||
append(L'\n');
|
||||
}
|
||||
}
|
||||
|
||||
void fd_output_stream_t::append(const wchar_t *s, size_t amt) {
|
||||
if (errored_) return;
|
||||
int res = wwrite_to_fd(s, amt, this->fd_);
|
||||
if (res < 0) {
|
||||
// TODO: this error is too aggressive, e.g. if we got SIGINT we should not complain.
|
||||
wperror(L"write");
|
||||
errored_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
void null_output_stream_t::append(const wchar_t *, size_t) {}
|
||||
|
||||
void buffered_output_stream_t::append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); }
|
||||
|
||||
void buffered_output_stream_t::append_with_separation(const wchar_t *s, size_t len,
|
||||
separation_type_t type) {
|
||||
buffer_.append(s, s + len, type);
|
||||
}
|
||||
|
|
95
src/io.h
95
src/io.h
|
@ -344,9 +344,9 @@ class io_buffer_t {
|
|||
buffer_.append(ptr, ptr + count);
|
||||
}
|
||||
|
||||
/// Appends data from a given output_stream_t.
|
||||
/// Marks the receiver as discarded if the stream was discarded.
|
||||
void append_from_stream(const output_stream_t &stream);
|
||||
/// Appends data from a given separated buffer.
|
||||
/// Marks the receiver as discarded if the buffer was discarded.
|
||||
void append_from_wide_buffer(const separated_buffer_t<wcstring> &input);
|
||||
};
|
||||
|
||||
using io_data_ref_t = std::shared_ptr<const io_data_t>;
|
||||
|
@ -398,36 +398,36 @@ maybe_t<autoclose_pipes_t> make_autoclose_pipes(const fd_set_t &fdset);
|
|||
/// cloexec. \returns invalid fd on failure (in which case the given fd is still closed).
|
||||
autoclose_fd_t move_fd_to_unused(autoclose_fd_t fd, const fd_set_t &fdset);
|
||||
|
||||
/// Class representing the output that a builtin can generate.
|
||||
/// Base class representing the output that a builtin can generate.
|
||||
/// This has various subclasses depending on the ultimate output destination.
|
||||
class output_stream_t {
|
||||
private:
|
||||
/// Storage for our data.
|
||||
separated_buffer_t<wcstring> buffer_;
|
||||
|
||||
// No copying.
|
||||
output_stream_t(const output_stream_t &s) = delete;
|
||||
void operator=(const output_stream_t &s) = delete;
|
||||
|
||||
public:
|
||||
output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {}
|
||||
/// Required override point. The output stream receives a string \p s with \p amt chars.
|
||||
virtual void append(const wchar_t *s, size_t amt) = 0;
|
||||
|
||||
void append(const wcstring &s) { buffer_.append(s.begin(), s.end()); }
|
||||
/// \return the separated buffer if this holds one, otherwise nullptr.
|
||||
virtual const separated_buffer_t<wcstring> *get_separated_buffer() const { return nullptr; }
|
||||
|
||||
separated_buffer_t<wcstring> &buffer() { return buffer_; }
|
||||
/// An optional override point. This is for explicit separation.
|
||||
virtual void append_with_separation(const wchar_t *s, size_t len, separation_type_t type);
|
||||
|
||||
const separated_buffer_t<wcstring> &buffer() const { return buffer_; }
|
||||
/// The following are all convenience overrides.
|
||||
void append_with_separation(const wcstring &s, separation_type_t type) {
|
||||
append_with_separation(s.data(), s.size(), type);
|
||||
}
|
||||
|
||||
/// Append a string.
|
||||
void append(const wcstring &s) { append(s.data(), s.size()); }
|
||||
void append(const wchar_t *s) { append(s, std::wcslen(s)); }
|
||||
|
||||
/// Append a char.
|
||||
void append(wchar_t s) { append(&s, 1); }
|
||||
|
||||
void append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); }
|
||||
void push_back(wchar_t c) { append(c); }
|
||||
|
||||
// Append data from a narrow buffer, widening it.
|
||||
void append_narrow_buffer(const separated_buffer_t<std::string> &buffer);
|
||||
|
||||
void push_back(wchar_t c) { append(c); }
|
||||
|
||||
/// Append a format string.
|
||||
void append_format(const wchar_t *format, ...) {
|
||||
va_list va;
|
||||
va_start(va, format);
|
||||
|
@ -437,12 +437,61 @@ class output_stream_t {
|
|||
|
||||
void append_formatv(const wchar_t *format, va_list va) { append(vformat_string(format, va)); }
|
||||
|
||||
// No copying.
|
||||
output_stream_t(const output_stream_t &s) = delete;
|
||||
void operator=(const output_stream_t &s) = delete;
|
||||
|
||||
output_stream_t() = default;
|
||||
virtual ~output_stream_t() = default;
|
||||
};
|
||||
|
||||
/// A null output stream which ignores all writes.
|
||||
class null_output_stream_t final : public output_stream_t {
|
||||
virtual void append(const wchar_t *s, size_t amt) override;
|
||||
};
|
||||
|
||||
/// An output stream for builtins which outputs to an fd.
|
||||
/// Note the fd may be something like stdout; there is no ownership implied here.
|
||||
class fd_output_stream_t final : public output_stream_t {
|
||||
public:
|
||||
/// Construct from a file descriptor, which must be nonegative.
|
||||
explicit fd_output_stream_t(int fd) : fd_(fd) { assert(fd_ >= 0 && "Invalid fd"); }
|
||||
|
||||
void append(const wchar_t *s, size_t amt) override;
|
||||
|
||||
private:
|
||||
/// The file descriptor to write to.
|
||||
const int fd_;
|
||||
|
||||
/// Whether we have received an error.
|
||||
bool errored_{false};
|
||||
};
|
||||
|
||||
/// An output stream for builtins which buffers into a separated buffer.
|
||||
class buffered_output_stream_t final : public output_stream_t {
|
||||
public:
|
||||
explicit buffered_output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {}
|
||||
|
||||
void append(const wchar_t *s, size_t amt) override;
|
||||
void append_with_separation(const wchar_t *s, size_t len, separation_type_t type) override;
|
||||
|
||||
wcstring contents() const { return buffer_.newline_serialized(); }
|
||||
|
||||
/// Access the buffer.
|
||||
separated_buffer_t<wcstring> &buffer() { return buffer_; }
|
||||
const separated_buffer_t<wcstring> &buffer() const { return buffer_; }
|
||||
|
||||
const separated_buffer_t<wcstring> *get_separated_buffer() const override { return &buffer_; }
|
||||
|
||||
private:
|
||||
/// Storage for our data.
|
||||
separated_buffer_t<wcstring> buffer_;
|
||||
};
|
||||
|
||||
struct io_streams_t {
|
||||
output_stream_t out;
|
||||
output_stream_t err;
|
||||
// Streams for out and err.
|
||||
output_stream_t &out;
|
||||
output_stream_t &err;
|
||||
|
||||
// fd representing stdin. This is not closed by the destructor.
|
||||
int stdin_fd{-1};
|
||||
|
@ -473,7 +522,7 @@ struct io_streams_t {
|
|||
io_streams_t(const io_streams_t &) = delete;
|
||||
void operator=(const io_streams_t &) = delete;
|
||||
|
||||
explicit io_streams_t(size_t read_limit) : out(read_limit), err(read_limit), stdin_fd(-1) {}
|
||||
io_streams_t(output_stream_t &out, output_stream_t &err) : out(out), err(err) {}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -388,11 +388,14 @@ end_execution_reason_t parse_execution_context_t::run_function_statement(
|
|||
return result;
|
||||
}
|
||||
trace_if_enabled(*parser, L"function", arguments);
|
||||
io_streams_t streams(0); // no limit on the amount of output from builtin_function()
|
||||
// no limit on the amount of output from builtin_function()
|
||||
buffered_output_stream_t outs(0);
|
||||
buffered_output_stream_t errs(0);
|
||||
io_streams_t streams(outs, errs);
|
||||
int err = builtin_function(*parser, streams, arguments, pstree, statement);
|
||||
parser->set_last_statuses(statuses_t::just(err));
|
||||
|
||||
wcstring errtext = streams.err.contents();
|
||||
wcstring errtext = errs.contents();
|
||||
if (!errtext.empty()) {
|
||||
return this->report_error(err, header, L"%ls", errtext.c_str());
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue