Remove bytes usage and update dependencies

This commit is contained in:
Ryan Leckey 2019-12-02 23:26:44 -08:00
parent bf4f65ea2f
commit 9558ab1c50
6 changed files with 36 additions and 34 deletions

View file

@ -28,10 +28,10 @@ uuid = [ "sqlx-core/uuid", "sqlx-macros/uuid" ]
[dependencies]
sqlx-core = { version = "0.1.0-pre", path = "sqlx-core" }
sqlx-macros = { version = "0.1.0-pre", path = "sqlx-macros", optional = true }
proc-macro-hack = { version = "0.5", optional = true }
proc-macro-hack = { version = "0.5.11", optional = true }
[dev-dependencies]
async-std = { version = "1.1.0", features = [ "attributes" ] }
async-std = { version = "1.2.0", features = [ "attributes" ] }
matches = "0.1.8"
criterion = "0.3.0"

View file

@ -5,10 +5,10 @@ edition = "2018"
workspace = "../.."
[dependencies]
anyhow = "1.0.22"
anyhow = "1.0.25"
dotenv = "0.15.0"
async-std = "1.1.0"
async-std = "1.2.0"
tide = "0.4.0"
sqlx = { path = "../..", features = [ "postgres" ] }
serde = { version = "1", features = [ "derive"] }
serde = { version = "1.0.103", features = [ "derive"] }
futures = "0.3.1"

View file

@ -16,11 +16,10 @@ postgres = []
mariadb = []
[dependencies]
async-std = { version = "1.1.0", features = ["attributes", "unstable"] }
async-std = { version = "1.2.0", features = ["attributes", "unstable"] }
async-stream = "0.2.0"
bitflags = "1.2.1"
byteorder = { version = "1.3.2", default-features = false }
bytes = "0.4.12"
futures-channel = "0.3.1"
futures-core = "0.3.1"
futures-util = "0.3.1"
@ -32,3 +31,4 @@ uuid = { version = "0.8.1", optional = true }
[dev-dependencies]
matches = "0.1.8"
bytes = "0.5.2"

View file

@ -2,8 +2,8 @@ use async_std::io::{
prelude::{ReadExt, WriteExt},
Read, Write,
};
use bytes::{BufMut, BytesMut};
use std::io;
use bitflags::_core::mem::MaybeUninit;
pub struct BufStream<S> {
pub(crate) stream: S,
@ -15,19 +15,23 @@ pub struct BufStream<S> {
wbuf: Vec<u8>,
// Buffer used when reading incoming messages
rbuf: BytesMut,
rbuf: Vec<u8>,
rbuf_rindex: usize,
rbuf_windex: usize,
}
impl<S> BufStream<S>
where
S: Read + Write + Unpin,
where
S: Read + Write + Unpin,
{
pub fn new(stream: S) -> Self {
Self {
stream,
stream_eof: false,
wbuf: Vec::with_capacity(1024),
rbuf: BytesMut::with_capacity(8 * 1024),
rbuf: vec![0; 8 * 1024],
rbuf_rindex: 0,
rbuf_windex: 0,
}
}
@ -48,7 +52,7 @@ where
#[inline]
pub fn consume(&mut self, cnt: usize) {
self.rbuf.advance(cnt);
self.rbuf_rindex += cnt;
}
pub async fn peek(&mut self, cnt: usize) -> io::Result<Option<&[u8]>> {
@ -61,25 +65,23 @@ where
// If we have enough bytes in our read buffer,
// return immediately
if self.rbuf.len() >= cnt {
return Ok(Some(&self.rbuf[..cnt]));
if self.rbuf_windex >= (self.rbuf_rindex + cnt) {
return Ok(Some(&self.rbuf[self.rbuf_rindex..(self.rbuf_rindex + cnt)]));
}
if self.rbuf.capacity() < cnt {
// Ask for exactly how much we need with a lower bound of 32-bytes
let needed = (cnt - self.rbuf.capacity()).max(32);
self.rbuf.reserve(needed);
// If we are out of space to write to in the read buffer,
// we reset the indexes
if self.rbuf.len() < (self.rbuf_windex + cnt) {
// TODO: This assumes that all data is consumed when we need to re-allocate
debug_assert_eq!(self.rbuf_rindex, self.rbuf_windex);
self.rbuf_rindex = 0;
self.rbuf_windex = 0;
}
// SAFE: Read data in directly to buffer without zero-initializing the data.
// Postgres is a self-describing format and the TCP frames encode
// length headers. We will never attempt to decode more than we
// received.
let n = self.stream.read(unsafe { self.rbuf.bytes_mut() }).await?;
let n = self.stream.read(&mut self.rbuf[self.rbuf_windex..]).await?;
// SAFE: After we read in N bytes, we can tell the buffer that it actually
// has that many bytes MORE for the decode routines to look at
unsafe { self.rbuf.advance_mut(n) }
self.rbuf_windex += n;
if n == 0 {
self.stream_eof = true;

View file

@ -75,10 +75,12 @@ impl Postgres {
self.stream.flush().await?;
while let Some(message) = self.receive().await? {
println!("recv!?");
match message {
Message::Authentication(auth) => {
match *auth {
protocol::Authentication::Ok => {
println!("no auth?");
// Do nothing. No password is needed to continue.
}
@ -126,6 +128,8 @@ impl Postgres {
}
}
println!("done");
Ok(())
}
@ -221,10 +225,6 @@ impl Postgres {
// Wait and return the next message to be received from Postgres.
pub(super) async fn receive(&mut self) -> crate::Result<Option<Message>> {
// Before we start the receive loop
// Flush any pending data from the send buffer
self.stream.flush().await?;
loop {
// Read the message header (id + len)
let mut header = ret_if_none!(self.stream.peek(5).await?);

View file

@ -8,13 +8,13 @@ edition = "2018"
proc-macro = true
[dependencies]
async-std = "1.1.0"
async-std = "1.2.0"
dotenv = "0.15.0"
futures = "0.3.1"
proc-macro-hack = "0.5"
proc-macro-hack = "0.5.11"
proc-macro2 = "1.0.6"
sqlx = { version = "0.1.0-pre", path = "../sqlx-core", package = "sqlx-core" }
syn = "1.0.8"
syn = "1.0.11"
quote = "1.0.2"
url = "2.1.0"