Finish reallocation strategy for BufStream

Fixes #30
This commit is contained in:
Ryan Leckey 2020-01-06 10:53:56 -08:00
parent 5c532a8ecc
commit 92f3f8cf6f
2 changed files with 51 additions and 7 deletions

View file

@ -4,6 +4,8 @@ use async_std::io::{
};
use std::io;
const RBUF_SIZE: usize = 8 * 1024;
pub struct BufStream<S> {
pub(crate) stream: S,
@ -28,7 +30,7 @@ where
stream,
stream_eof: false,
wbuf: Vec::with_capacity(1024),
rbuf: vec![0; 8 * 1024],
rbuf: vec![0; RBUF_SIZE],
rbuf_rindex: 0,
rbuf_windex: 0,
}
@ -70,14 +72,39 @@ where
return Ok(Some(buf));
}
// If we are out of space to write to in the read buffer,
// we reset the indexes
// If we are out of space to write to in the read buffer ..
if self.rbuf.len() < (self.rbuf_windex + cnt) {
// TODO: This assumes that all data is consumed when we need to re-allocate
debug_assert_eq!(self.rbuf_rindex, self.rbuf_windex);
if self.rbuf_rindex == self.rbuf_windex {
// We have consumed all data; simply reset the indexes
self.rbuf_rindex = 0;
self.rbuf_windex = 0;
} else {
// Allocate a new buffer
let mut new_rbuf = Vec::with_capacity(RBUF_SIZE);
self.rbuf_rindex = 0;
self.rbuf_windex = 0;
// Take the minimum of the read and write indexes
let min_index = self.rbuf_rindex.min(self.rbuf_windex);
// Copy the old buffer to our new buffer
new_rbuf.extend_from_slice(&self.rbuf[min_index..]);
// Zero-extend the new buffer
new_rbuf.resize(new_rbuf.capacity(), 0);
// Replace the old buffer with our new buffer
self.rbuf = new_rbuf;
// And reduce the indexes
self.rbuf_rindex -= min_index;
self.rbuf_windex -= min_index;
}
// Do we need more space still
if self.rbuf.len() < (self.rbuf_windex + cnt) {
let needed = (self.rbuf_windex + cnt) - self.rbuf.len();
self.rbuf.resize(self.rbuf.len() + needed, 0);
}
}
let n = self.stream.read(&mut self.rbuf[self.rbuf_windex..]).await?;

View file

@ -72,6 +72,23 @@ CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY);
Ok(())
}
#[async_std::test]
async fn it_remains_stable_issue_30() -> anyhow::Result<()> {
let mut conn = connect().await?;
// This tests the internal buffer wrapping around at the end
// Specifically: https://github.com/launchbadge/sqlx/issues/30
let rows = sqlx::query("SELECT i, random()::text FROM generate_series(1, 1000) as i")
.fetch_all(&mut conn)
.await?;
assert_eq!(rows.len(), 1000);
assert_eq!(rows[rows.len() - 1].get::<i32, _>(0), 1000);
Ok(())
}
async fn connect() -> anyhow::Result<PgConnection> {
Ok(PgConnection::open(dotenv::var("DATABASE_URL")?).await?)
}