mirror of
https://github.com/fish-shell/fish-shell
synced 2025-01-01 07:38:46 +00:00
Improve locking discipline in io_buffer_t
Previously we had a lock that was taken in an ad-hoc manner. Switch to using owning_lock.
This commit is contained in:
parent
8bcc8c1a36
commit
258149fe2e
5 changed files with 53 additions and 45 deletions
|
@ -75,11 +75,11 @@ maybe_t<int> builtin_eval(parser_t &parser, io_streams_t &streams, wchar_t **arg
|
||||||
ios.clear();
|
ios.clear();
|
||||||
if (stdout_fill) {
|
if (stdout_fill) {
|
||||||
std::shared_ptr<io_buffer_t> output = io_bufferfill_t::finish(std::move(stdout_fill));
|
std::shared_ptr<io_buffer_t> output = io_bufferfill_t::finish(std::move(stdout_fill));
|
||||||
streams.out.append_narrow_buffer(output->buffer());
|
streams.out.append_narrow_buffer(output->take_buffer());
|
||||||
}
|
}
|
||||||
if (stderr_fill) {
|
if (stderr_fill) {
|
||||||
std::shared_ptr<io_buffer_t> errput = io_bufferfill_t::finish(std::move(stderr_fill));
|
std::shared_ptr<io_buffer_t> errput = io_bufferfill_t::finish(std::move(stderr_fill));
|
||||||
streams.err.append_narrow_buffer(errput->buffer());
|
streams.err.append_narrow_buffer(errput->take_buffer());
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
13
src/exec.cpp
13
src/exec.cpp
|
@ -691,7 +691,7 @@ static launch_result_t exec_block_or_func_process(parser_t &parser, const std::s
|
||||||
// claimed it (background write) or another process has inherited it.
|
// claimed it (background write) or another process has inherited it.
|
||||||
io_chain.remove(block_output_bufferfill);
|
io_chain.remove(block_output_bufferfill);
|
||||||
auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill));
|
auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill));
|
||||||
buffer_contents = block_output_buffer->buffer().newline_serialized();
|
buffer_contents = block_output_buffer->take_buffer().newline_serialized();
|
||||||
}
|
}
|
||||||
|
|
||||||
run_internal_process_or_short_circuit(parser, j, p, std::move(buffer_contents),
|
run_internal_process_or_short_circuit(parser, j, p, std::move(buffer_contents),
|
||||||
|
@ -1036,9 +1036,10 @@ bool exec_job(parser_t &parser, const shared_ptr<job_t> &j, const io_chain_t &bl
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Populate \p lst with the output of \p buffer, perhaps splitting lines according to \p split.
|
/// Populate \p lst with the output of \p buffer, perhaps splitting lines according to \p split.
|
||||||
static void populate_subshell_output(wcstring_list_t *lst, const io_buffer_t &buffer, bool split) {
|
static void populate_subshell_output(wcstring_list_t *lst, const separated_buffer_t &buffer,
|
||||||
|
bool split) {
|
||||||
// Walk over all the elements.
|
// Walk over all the elements.
|
||||||
for (const auto &elem : buffer.buffer().elements()) {
|
for (const auto &elem : buffer.elements()) {
|
||||||
if (elem.is_explicitly_separated()) {
|
if (elem.is_explicitly_separated()) {
|
||||||
// Just append this one.
|
// Just append this one.
|
||||||
lst->push_back(str2wcstring(elem.contents));
|
lst->push_back(str2wcstring(elem.contents));
|
||||||
|
@ -1109,8 +1110,8 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser,
|
||||||
return STATUS_CMD_ERROR;
|
return STATUS_CMD_ERROR;
|
||||||
}
|
}
|
||||||
eval_res_t eval_res = parser.eval(cmd, io_chain_t{bufferfill}, job_group, block_type_t::subst);
|
eval_res_t eval_res = parser.eval(cmd, io_chain_t{bufferfill}, job_group, block_type_t::subst);
|
||||||
std::shared_ptr<io_buffer_t> buffer = io_bufferfill_t::finish(std::move(bufferfill));
|
separated_buffer_t buffer = io_bufferfill_t::finish(std::move(bufferfill))->take_buffer();
|
||||||
if (buffer->buffer().discarded()) {
|
if (buffer.discarded()) {
|
||||||
*break_expand = true;
|
*break_expand = true;
|
||||||
return STATUS_READ_TOO_MUCH;
|
return STATUS_READ_TOO_MUCH;
|
||||||
}
|
}
|
||||||
|
@ -1121,7 +1122,7 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lst) {
|
if (lst) {
|
||||||
populate_subshell_output(lst, *buffer, split_output);
|
populate_subshell_output(lst, buffer, split_output);
|
||||||
}
|
}
|
||||||
*break_expand = false;
|
*break_expand = false;
|
||||||
return eval_res.status.status_value();
|
return eval_res.status.status_value();
|
||||||
|
|
|
@ -1302,10 +1302,10 @@ static void test_1_cancellation(const wchar_t *src) {
|
||||||
pthread_kill(thread, SIGINT);
|
pthread_kill(thread, SIGINT);
|
||||||
});
|
});
|
||||||
eval_res_t res = parser_t::principal_parser().eval(src, io_chain_t{filler});
|
eval_res_t res = parser_t::principal_parser().eval(src, io_chain_t{filler});
|
||||||
auto buffer = io_bufferfill_t::finish(std::move(filler));
|
auto buffer = io_bufferfill_t::finish(std::move(filler))->take_buffer();
|
||||||
if (buffer->buffer().size() != 0) {
|
if (buffer.size() != 0) {
|
||||||
err(L"Expected 0 bytes in out_buff, but instead found %lu bytes, for command %ls\n",
|
err(L"Expected 0 bytes in out_buff, but instead found %lu bytes, for command %ls\n",
|
||||||
buffer->buffer().size(), src);
|
buffer.size(), src);
|
||||||
}
|
}
|
||||||
do_test(res.status.signal_exited() && res.status.signal_code() == SIGINT);
|
do_test(res.status.signal_exited() && res.status.signal_code() == SIGINT);
|
||||||
iothread_drain_all();
|
iothread_drain_all();
|
||||||
|
|
27
src/io.cpp
27
src/io.cpp
|
@ -57,23 +57,22 @@ void io_bufferfill_t::print() const {
|
||||||
std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd);
|
std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
ssize_t io_buffer_t::read_once(int fd) {
|
ssize_t io_buffer_t::read_once(int fd, acquired_lock<separated_buffer_t> &buffer) {
|
||||||
assert(fd >= 0 && "Invalid fd");
|
assert(fd >= 0 && "Invalid fd");
|
||||||
ASSERT_IS_LOCKED(append_lock_);
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
char buff[4096 * 4];
|
char bytes[4096 * 4];
|
||||||
|
|
||||||
// We want to swallow EINTR only; in particular EAGAIN needs to be returned back to the caller.
|
// We want to swallow EINTR only; in particular EAGAIN needs to be returned back to the caller.
|
||||||
ssize_t ret;
|
ssize_t amt;
|
||||||
do {
|
do {
|
||||||
ret = read(fd, buff, sizeof buff);
|
amt = read(fd, bytes, sizeof bytes);
|
||||||
} while (ret < 0 && errno == EINTR);
|
} while (amt < 0 && errno == EINTR);
|
||||||
if (ret < 0 && errno != EAGAIN) {
|
if (amt < 0 && errno != EAGAIN) {
|
||||||
wperror(L"read");
|
wperror(L"read");
|
||||||
} else if (ret > 0) {
|
} else if (amt > 0) {
|
||||||
buffer_.append(buff, static_cast<size_t>(ret));
|
buffer->append(bytes, static_cast<size_t>(amt));
|
||||||
}
|
}
|
||||||
return ret;
|
return amt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void io_buffer_t::begin_filling(autoclose_fd_t fd) {
|
void io_buffer_t::begin_filling(autoclose_fd_t fd) {
|
||||||
|
@ -116,16 +115,16 @@ void io_buffer_t::begin_filling(autoclose_fd_t fd) {
|
||||||
bool done = false;
|
bool done = false;
|
||||||
if (reason == item_wake_reason_t::readable) {
|
if (reason == item_wake_reason_t::readable) {
|
||||||
// select() reported us as readable; read a bit.
|
// select() reported us as readable; read a bit.
|
||||||
scoped_lock locker(append_lock_);
|
auto buffer = buffer_.acquire();
|
||||||
ssize_t ret = read_once(fd.fd());
|
ssize_t ret = read_once(fd.fd(), buffer);
|
||||||
done = (ret == 0 || (ret < 0 && errno != EAGAIN));
|
done = (ret == 0 || (ret < 0 && errno != EAGAIN));
|
||||||
} else if (shutdown_fillthread_) {
|
} else if (shutdown_fillthread_) {
|
||||||
// Here our caller asked us to shut down; read while we keep getting data.
|
// Here our caller asked us to shut down; read while we keep getting data.
|
||||||
// This will stop when the fd is closed or if we get EAGAIN.
|
// This will stop when the fd is closed or if we get EAGAIN.
|
||||||
scoped_lock locker(append_lock_);
|
auto buffer = buffer_.acquire();
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
do {
|
do {
|
||||||
ret = read_once(fd.fd());
|
ret = read_once(fd.fd(), buffer);
|
||||||
} while (ret > 0);
|
} while (ret > 0);
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
|
|
48
src/io.h
48
src/io.h
|
@ -74,6 +74,11 @@ class separated_buffer_t {
|
||||||
separated_buffer_t(const separated_buffer_t &) = delete;
|
separated_buffer_t(const separated_buffer_t &) = delete;
|
||||||
void operator=(const separated_buffer_t &) = delete;
|
void operator=(const separated_buffer_t &) = delete;
|
||||||
|
|
||||||
|
/// We may be moved.
|
||||||
|
/// Note this leaves the moved-from value in a bogus state, until clear() is called on it.
|
||||||
|
separated_buffer_t(separated_buffer_t &&rhs) = default;
|
||||||
|
separated_buffer_t &operator=(separated_buffer_t &&) = default;
|
||||||
|
|
||||||
/// Construct a separated_buffer_t with the given buffer limit \p limit, or 0 for no limit.
|
/// Construct a separated_buffer_t with the given buffer limit \p limit, or 0 for no limit.
|
||||||
separated_buffer_t(size_t limit) : buffer_limit_(limit) {}
|
separated_buffer_t(size_t limit) : buffer_limit_(limit) {}
|
||||||
|
|
||||||
|
@ -125,6 +130,13 @@ class separated_buffer_t {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove all elements and unset the discard flag.
|
||||||
|
void clear() {
|
||||||
|
elements_.clear();
|
||||||
|
contents_size_ = 0;
|
||||||
|
discard_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// \return true if our last element has an inferred separation type.
|
/// \return true if our last element has an inferred separation type.
|
||||||
bool last_inferred() const {
|
bool last_inferred() const {
|
||||||
|
@ -146,8 +158,7 @@ class separated_buffer_t {
|
||||||
if (discard_) return false;
|
if (discard_) return false;
|
||||||
size_t proposed_size = contents_size_ + delta;
|
size_t proposed_size = contents_size_ + delta;
|
||||||
if ((proposed_size < delta) || (buffer_limit_ > 0 && proposed_size > buffer_limit_)) {
|
if ((proposed_size < delta) || (buffer_limit_ > 0 && proposed_size > buffer_limit_)) {
|
||||||
elements_.clear();
|
clear();
|
||||||
contents_size_ = 0;
|
|
||||||
discard_ = true;
|
discard_ = true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -305,29 +316,29 @@ public:
|
||||||
|
|
||||||
~io_buffer_t();
|
~io_buffer_t();
|
||||||
|
|
||||||
/// Access the underlying buffer.
|
/// Take the underlying buffer, transferring ownership to the caller.
|
||||||
/// This requires that the background fillthread be none.
|
/// This should only be called after the fillthread operation is complete.
|
||||||
const separated_buffer_t &buffer() const {
|
separated_buffer_t take_buffer() {
|
||||||
assert(!fillthread_running() && "Cannot access buffer during background fill");
|
assert(!fillthread_running() && "Cannot access buffer during background fill");
|
||||||
return buffer_;
|
auto locked_buff = buffer_.acquire();
|
||||||
|
separated_buffer_t result = std::move(*locked_buff);
|
||||||
|
locked_buff->clear();
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a string to the buffer.
|
/// Append a string to the buffer.
|
||||||
void append(std::string &&str, separation_type_t type = separation_type_t::inferred) {
|
void append(std::string &&str, separation_type_t type = separation_type_t::inferred) {
|
||||||
scoped_lock locker(append_lock_);
|
buffer_.acquire()->append(std::move(str), type);
|
||||||
buffer_.append(std::move(str), type);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// \return true if output was discarded due to exceeding the read limit.
|
/// \return true if output was discarded due to exceeding the read limit.
|
||||||
bool discarded() const {
|
bool discarded() { return buffer_.acquire()->discarded(); }
|
||||||
scoped_lock locker(append_lock_);
|
|
||||||
return buffer_.discarded();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Read some, filling the buffer. The append lock must be held.
|
/// Read some, filling the buffer. The buffer is passed in to enforce that the append lock is
|
||||||
/// \return positive on success, 0 if closed, -1 on error (in which case errno will be set).
|
/// held. \return positive on success, 0 if closed, -1 on error (in which case errno will be
|
||||||
ssize_t read_once(int fd);
|
/// set).
|
||||||
|
ssize_t read_once(int fd, acquired_lock<separated_buffer_t> &buff);
|
||||||
|
|
||||||
/// Begin the fill operation, reading from the given fd in the background.
|
/// Begin the fill operation, reading from the given fd in the background.
|
||||||
void begin_filling(autoclose_fd_t readfd);
|
void begin_filling(autoclose_fd_t readfd);
|
||||||
|
@ -338,10 +349,8 @@ public:
|
||||||
/// Helper to return whether the fillthread is running.
|
/// Helper to return whether the fillthread is running.
|
||||||
bool fillthread_running() const { return fillthread_waiter_.valid(); }
|
bool fillthread_running() const { return fillthread_waiter_.valid(); }
|
||||||
|
|
||||||
friend io_bufferfill_t;
|
|
||||||
|
|
||||||
/// Buffer storing what we have read.
|
/// Buffer storing what we have read.
|
||||||
separated_buffer_t buffer_;
|
owning_lock<separated_buffer_t> buffer_;
|
||||||
|
|
||||||
/// Atomic flag indicating our fillthread should shut down.
|
/// Atomic flag indicating our fillthread should shut down.
|
||||||
relaxed_atomic_bool_t shutdown_fillthread_{false};
|
relaxed_atomic_bool_t shutdown_fillthread_{false};
|
||||||
|
@ -353,8 +362,7 @@ public:
|
||||||
/// The item id of our background fillthread fd monitor item.
|
/// The item id of our background fillthread fd monitor item.
|
||||||
uint64_t item_id_{0};
|
uint64_t item_id_{0};
|
||||||
|
|
||||||
/// Lock for appending. Mutable since we take it in const functions.
|
friend io_bufferfill_t;
|
||||||
mutable std::mutex append_lock_{};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using io_data_ref_t = std::shared_ptr<const io_data_t>;
|
using io_data_ref_t = std::shared_ptr<const io_data_t>;
|
||||||
|
|
Loading…
Reference in a new issue