use pin-project and fix impl of GuardedFlush

This commit is contained in:
Ryan Leckey 2020-03-09 08:52:21 -07:00
parent f3fe264478
commit 672f83c00e
7 changed files with 42 additions and 16 deletions

21
Cargo.lock generated
View file

@ -1097,6 +1097,24 @@ name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "pin-project"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"pin-project-internal 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pin-project-internal"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pin-project-lite"
version = "0.1.4"
@ -1492,6 +1510,7 @@ dependencies = [
"memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"sha-1 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2183,6 +2202,8 @@ dependencies = [
"checksum paw-raw 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7f0b59668fe80c5afe998f0c0bf93322bf2cd66cafeeb80581f291716f3467f2"
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
"checksum pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7804a463a8d9572f13453c516a5faea534a2403d7ced2f0c7e100eeff072772c"
"checksum pin-project-internal 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f"
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"

View file

@ -49,6 +49,7 @@ sha2 = { version = "0.8.1", default-features = false, optional = true }
tokio = { version = "0.2.13", default-features = false, features = [ "dns", "fs", "time", "tcp" ], optional = true }
url = { version = "2.1.1", default-features = false }
uuid = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] }
pin-project = "0.4"
[dev-dependencies]
matches = "0.1.8"

View file

@ -5,8 +5,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::ready;
use pin_project::{pin_project, pinned_drop};
use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite};
const RBUF_SIZE: usize = 8 * 1024;
@ -25,7 +26,9 @@ pub struct BufStream<S> {
rbuf_windex: usize,
}
#[pin_project(PinnedDrop)]
pub struct GuardedFlush<'a, S: 'a> {
#[pin]
stream: &'a mut S,
buf: io::Cursor<&'a mut Vec<u8>>,
}
@ -170,23 +173,26 @@ impl<'a, S: AsyncWrite + Unpin> Future for GuardedFlush<'a, S> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let buf = self.buf.fill_buf()?;
let buf = this.buf.fill_buf()?;
if !buf.is_empty() {
let written = ready!(self.stream.poll_write(cx)?);
self.buf.consume(written);
let written = ready!(this.stream.as_mut().poll_write(cx, buf)?);
this.buf.consume(written);
} else {
break;
}
}
self.stream.poll_flush(cx)
this.stream.poll_flush(cx)
}
}
impl<'a, S> Drop for GuardedFlush<'a, S> {
fn drop(&mut self) {
#[pinned_drop]
impl<'a, S> PinnedDrop for GuardedFlush<'a, S> {
fn drop(mut self: Pin<&mut Self>) {
// clear the buffer regardless of whether the flush succeeded or not
self.buf.get_mut().clear();
}

View file

@ -244,7 +244,7 @@ impl PgConnection {
crate::query::query(&query)
.bind_all(args)
.map(|row: PgRow| -> crate::Result<(u32, SharedStr)> {
.try_map(|row: PgRow| -> crate::Result<(u32, SharedStr)> {
Ok((
row.get::<i32, _>(0)? as u32,
row.get::<String, _>(1)?.into(),
@ -295,7 +295,7 @@ impl PgConnection {
crate::query::query(&query)
.bind_all(args)
.map(|row: PgRow| {
.try_map(|row: PgRow| {
let idx = row.get::<i32, _>(0)?;
let non_null = row.get::<Option<bool>, _>(1)?;
@ -405,5 +405,3 @@ impl<'c> RefExecutor<'c> for &'c mut super::PgConnection {
PgCursor::from_connection(self, query)
}
}
impl_execute_for_query!(Postgres);

View file

@ -98,7 +98,7 @@ pub fn quote_query_as<DB: DatabaseExt>(
let row_path = DB::row_path();
quote! {
sqlx::query::<#db_path>(#sql).bind_all(#bind_args).map(|row: #row_path| {
sqlx::query::<#db_path>(#sql).bind_all(#bind_args).try_map(|row: #row_path| {
use sqlx::Row as _;
use sqlx::result_ext::ResultExt as _;

View file

@ -7,7 +7,7 @@ async fn test_query() -> anyhow::Result<()> {
let account = sqlx::query!(
"SELECT * from (VALUES (1, 'Herp Derpinson')) accounts(id, name) where id = $1",
1i32,
1i32
)
.fetch_one(&mut conn)
.await?;

View file

@ -9,7 +9,7 @@ async fn it_connects() -> anyhow::Result<()> {
let mut conn = connect().await?;
let value = sqlx::query("select 1 + 1")
.map(|row: PgRow| row.get::<i32, _>(0))
.try_map(|row: PgRow| row.get::<i32, _>(0))
.fetch_one(&mut conn)
.await?;
@ -41,7 +41,7 @@ CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY);
}
let sum: i32 = sqlx::query("SELECT id FROM users")
.map(|row: PgRow| row.get::<i32, _>(0))
.try_map(|row: PgRow| row.get::<i32, _>(0))
.fetch(&mut conn)
.try_fold(0_i32, |acc, x| async move { Ok(acc + x) })
.await?;
@ -59,7 +59,7 @@ async fn it_can_return_interleaved_nulls_issue_104() -> anyhow::Result<()> {
let tuple =
sqlx::query("SELECT NULL::INT, 10::INT, NULL, 20::INT, NULL, 40::INT, NULL, 80::INT")
.map(|row: PgRow| {
.try_map(|row: PgRow| {
Ok((
row.get::<Option<i32>, _>(0)?,
row.get::<Option<i32>, _>(1)?,