Improve performance and minor tweaks

This commit is contained in:
Ryan Leckey 2019-06-29 21:31:39 -07:00
parent 85710a16d6
commit 4c90e026f7
9 changed files with 94 additions and 177 deletions

View file

@ -3,7 +3,6 @@ members = [
".",
"sqlx-core",
"sqlx-postgres-protocol",
"sqlx-postgres-mock",
"sqlx-postgres",
]

View file

@ -1,11 +0,0 @@
[package]
name = "sqlx-postgres-mock"
version = "0.0.0"
authors = ["Ryan Leckey <leckey.ryan@gmail.com>"]
license = "MIT OR Apache-2.0"
description = "Mock the PostgreSQL wire protocol. Used internally to test and benchmark sqlx."
edition = "2018"
[dependencies]
runtime = "=0.3.0-alpha.6"
futures-preview = "=0.3.0-alpha.16"

View file

@ -1,41 +0,0 @@
#![feature(async_await)]
use futures::{io::AsyncWriteExt, TryStreamExt};
use runtime::net::TcpListener;
use std::io;
const ESTABLISH: &[u8] = b"\
R\0\0\0\x08\0\0\0\0\
S\0\0\0\x16application_name\0\0\
S\0\0\0\x19client_encoding\0UTF8\0\
S\0\0\0\x17DateStyle\0ISO, MDY\0\
S\0\0\0\x19integer_datetimes\0on\0\
S\0\0\0\x1bIntervalStyle\0iso_8601\0\
S\0\0\0\x14is_superuser\0on\0\
S\0\0\0\x19server_encoding\0UTF8\0\
S\0\0\0\x18server_version\011.2\0\
S\0\0\0#session_authorization\0postgres\0\
S\0\0\0#standard_conforming_strings\0on\0\
S\0\0\0\x11TimeZone\0UTC\0\
K\0\0\0\x0c\0\0'\xc6\x89R\xc5+\
Z\0\0\0\x05I";
#[runtime::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:5433")?;
println!("listening on {}", listener.local_addr()?);
listener
.incoming()
.try_for_each_concurrent(None, async move |mut stream| {
runtime::spawn(async move {
stream.write_all(ESTABLISH).await?;
Ok::<(), std::io::Error>(())
})
.await
})
.await?;
Ok(())
}

View file

@ -20,10 +20,3 @@ md-5 = "0.8.0"
[dev-dependencies]
matches = "0.1.8"
criterion = "0.2.11"
postgres = "0.16.0-rc.1"
runtime-tokio = "=0.3.0-alpha.5"
[[bench]]
name = "connection"
harness = false

View file

@ -1,27 +0,0 @@
#![feature(async_await)]
#[macro_use]
extern crate criterion;
use bytes::Bytes;
use criterion::{black_box, Criterion};
use sqlx_core::ConnectOptions;
use sqlx_postgres::Connection;
fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("Connection::establish", |b| {
let options = ConnectOptions::new()
.port(5433) // mock
.user("postgres")
.database("postgres");
b.iter(|| {
runtime::raw::enter(runtime::native::Native, async move {
let _conn = Connection::establish(options).await.unwrap();
});
});
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View file

@ -39,8 +39,8 @@ pub async fn establish<'a, 'b: 'a>(
conn.send(message).await?;
// FIXME: This feels like it could be reduced (see other connection flows)
while let Some(message) = conn.incoming.next().await {
match message {
while let Some(message) = conn.stream.next().await {
match message? {
Message::Authentication(Authentication::Ok) => {
// Do nothing; server is just telling us that
// there is no password needed
@ -73,7 +73,7 @@ pub async fn establish<'a, 'b: 'a>(
break;
}
_ => {
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}

View file

@ -1,28 +1,24 @@
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use futures::{
channel::mpsc,
io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
SinkExt,
io::{AsyncRead, AsyncWriteExt},
task::{Context, Poll},
Stream,
};
use runtime::{net::TcpStream, task::JoinHandle};
use runtime::net::TcpStream;
use sqlx_core::ConnectOptions;
use sqlx_postgres_protocol::{Encode, Message, Terminate};
use std::io;
use std::{io, pin::Pin};
mod establish;
// mod query;
pub struct Connection {
writer: WriteHalf<TcpStream>,
incoming: mpsc::UnboundedReceiver<Message>,
stream: Framed<TcpStream>,
// Buffer used when serializing outgoing messages
// FIXME: Use BytesMut
wbuf: Vec<u8>,
// Handle to coroutine reading messages from the stream
receiver: JoinHandle<io::Result<()>>,
// Process ID of the Backend
process_id: u32,
@ -33,14 +29,9 @@ pub struct Connection {
impl Connection {
pub async fn establish(options: ConnectOptions<'_>) -> io::Result<Self> {
let stream = TcpStream::connect((options.host, options.port)).await?;
let (reader, writer) = stream.split();
let (tx, rx) = mpsc::unbounded();
let receiver = runtime::spawn(receiver(reader, tx));
let mut conn = Self {
wbuf: Vec::with_capacity(1024),
writer,
receiver,
incoming: rx,
stream: Framed::new(stream),
process_id: 0,
secret_key: 0,
};
@ -56,8 +47,7 @@ impl Connection {
pub async fn close(mut self) -> io::Result<()> {
self.send(Terminate).await?;
self.writer.close().await?;
self.receiver.await?;
self.stream.inner.close().await?;
Ok(())
}
@ -71,77 +61,92 @@ impl Connection {
message.encode(&mut self.wbuf)?;
log::trace!("sending: {:?}", bytes::Bytes::from(self.wbuf.clone()));
self.writer.write_all(&self.wbuf).await?;
self.writer.flush().await?;
self.stream.inner.write_all(&self.wbuf).await?;
self.stream.inner.flush().await?;
Ok(())
}
}
async fn receiver(
mut reader: ReadHalf<TcpStream>,
mut sender: mpsc::UnboundedSender<Message>,
) -> io::Result<()> {
let mut rbuf = BytesMut::with_capacity(1024);
let mut len = 0;
struct Framed<S> {
inner: S,
readable: bool,
eof: bool,
buffer: BytesMut,
}
loop {
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return.
// See: https://github.com/rust-lang-nursery/futures-rs/blob/master/futures-util/src/io/read_to_end.rs#L50-L54
if len == rbuf.len() {
rbuf.reserve(32);
unsafe {
// Set length to the capacity and efficiently
// zero-out the memory
rbuf.set_len(rbuf.capacity());
reader.initializer().initialize(&mut rbuf[len..]);
}
}
// TODO: Need a select! on a channel that I can trigger to cancel this
let cnt = reader.read(&mut rbuf[len..]).await?;
if cnt > 0 {
len += cnt;
} else {
// Read 0 bytes from the server; end-of-stream
break;
}
while len > 0 {
let size = rbuf.len();
let message = Message::decode(&mut rbuf)?;
len -= size - rbuf.len();
match message {
Some(Message::ParameterStatus(body)) => {
log::debug!("parameter: {} = {}", body.name(), body.value());
}
Some(Message::Response(body)) => {
log::warn!("response: {:?}", body);
}
Some(message) => {
sender.send(message).await.unwrap();
}
None => {
// Did not receive enough bytes to
// decode a complete message
break;
}
}
impl<S> Framed<S> {
fn new(stream: S) -> Self {
Self {
readable: false,
eof: false,
inner: stream,
buffer: BytesMut::with_capacity(8 * 1024),
}
}
}
impl<S> Stream for Framed<S>
where
S: AsyncRead + Unpin,
{
type Item = io::Result<Message>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let self_ = Pin::get_mut(self);
loop {
if self_.readable {
if self_.eof {
return Poll::Ready(None);
}
loop {
match Message::decode(&mut self_.buffer)? {
Some(Message::ParameterStatus(_body)) => {
// TODO: Not sure what to do with these but ignore
}
Some(Message::Response(_body)) => {
// TODO: Handle notices and errors
}
Some(message) => {
return Poll::Ready(Some(Ok(message)));
}
None => {
self_.readable = false;
break;
}
}
}
}
self_.buffer.reserve(32);
let n = unsafe {
let b = self_.buffer.bytes_mut();
self_.inner.initializer().initialize(b);
let n = match Pin::new(&mut self_.inner).poll_read(cx, b)? {
Poll::Ready(cnt) => cnt,
Poll::Pending => {
return Poll::Pending;
}
};
self_.buffer.advance_mut(n);
n
};
if n == 0 {
self_.eof = true;
}
self_.readable = true;
}
}
Ok(())
}

View file

@ -14,7 +14,6 @@ pub async fn query<'a, 'b: 'a>(conn: &'a mut Connection, query: &'a str) -> io::
}
ServerMessage::CommandComplete(body) => {
log::debug!("command complete: {}", body.tag()?);
}
_ => {

View file

@ -10,13 +10,13 @@ async fn main() -> io::Result<()> {
let conn = Connection::establish(
ConnectOptions::new()
.host("127.0.0.1")
.port(5433)
.port(5432)
.user("postgres")
.password("password"),
)
.await?;
// conn.close().await?;
conn.close().await?;
Ok(())
}