mirror of
https://github.com/launchbadge/sqlx
synced 2024-11-10 14:34:19 +00:00
Fix bug when a read on a BufStream is cancelled.
This commit is contained in:
parent
c96bcd9f6b
commit
01bef75cb9
1 changed files with 36 additions and 11 deletions
|
@ -104,21 +104,48 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// Holds a buffer which has been temporarily extended, so that
|
||||
// we can read into it. Automatically shrinks the buffer back
|
||||
// down if the read is cancelled.
|
||||
struct BufTruncator<'a> {
|
||||
buf: &'a mut BytesMut,
|
||||
filled_len: usize,
|
||||
}
|
||||
|
||||
impl<'a> BufTruncator<'a> {
|
||||
fn new(buf: &'a mut BytesMut) -> Self {
|
||||
let filled_len = buf.len();
|
||||
Self { buf, filled_len }
|
||||
}
|
||||
fn reserve(&mut self, space: usize) {
|
||||
self.buf.resize(self.filled_len + space, 0);
|
||||
}
|
||||
async fn read<S: AsyncRead + Unpin>(&mut self, stream: &mut S) -> Result<usize, Error> {
|
||||
let n = stream.read(&mut self.buf[self.filled_len..]).await?;
|
||||
self.filled_len += n;
|
||||
Ok(n)
|
||||
}
|
||||
fn is_full(&self) -> bool {
|
||||
self.filled_len >= self.buf.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for BufTruncator<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.buf.truncate(self.filled_len);
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_raw_into<S: AsyncRead + Unpin>(
|
||||
stream: &mut S,
|
||||
buf: &mut BytesMut,
|
||||
cnt: usize,
|
||||
) -> Result<(), Error> {
|
||||
let offset = buf.len();
|
||||
let mut buf = BufTruncator::new(buf);
|
||||
buf.reserve(cnt);
|
||||
|
||||
// zero-fills the space in the read buffer
|
||||
buf.resize(offset + cnt, 0);
|
||||
|
||||
let mut read = offset;
|
||||
while (offset + cnt) > read {
|
||||
// read in bytes from the stream into the read buffer starting
|
||||
// from the offset we last read from
|
||||
let n = stream.read(&mut buf[read..]).await?;
|
||||
while !buf.is_full() {
|
||||
let n = buf.read(stream).await?;
|
||||
|
||||
if n == 0 {
|
||||
// a zero read when we had space in the read buffer
|
||||
|
@ -128,8 +155,6 @@ async fn read_raw_into<S: AsyncRead + Unpin>(
|
|||
|
||||
return Err(io::Error::from(io::ErrorKind::ConnectionAborted).into());
|
||||
}
|
||||
|
||||
read += n;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in a new issue