Fix a couple minor nits with Pool and run rustfmt

This commit is contained in:
Ryan Leckey 2019-12-27 22:47:25 -08:00
parent a23bfb60eb
commit 9b0f34b0ce
15 changed files with 226 additions and 140 deletions

View file

@ -30,8 +30,8 @@ use crate::url::Url;
// used in tests and hidden code in examples
#[doc(hidden)]
pub async fn connect<T>(url: T) -> crate::Result<MySqlConnection>
where
T: TryInto<Url, Error = crate::Error>
where
T: TryInto<Url, Error = crate::Error>,
{
MySqlConnection::open(url.try_into()).await
}

View file

@ -208,39 +208,42 @@ fn conn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>, pool_tx: &Sender<Idle<D
where
DB::Connection: Connection<Database = DB>,
{
if pool.options.max_lifetime.is_some() || pool.options.idle_timeout.is_some() {
let pool = pool.clone();
let pool_tx = pool_tx.clone();
let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,
let reap_period = cmp::min(pool.options.max_lifetime, pool.options.idle_timeout)
.expect("one of max_lifetime/idle_timeout should be `Some` at this point");
(Some(a), Some(b)) => cmp::min(a, b),
task::spawn(async move {
while !pool.closed.load(Ordering::AcqRel) {
// reap at most the current size minus the minimum idle
let max_reaped = pool
.size
.load(Ordering::Acquire)
.saturating_sub(pool.options.min_idle);
(None, None) => return,
};
// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pool_rx.recv().now_or_never()?)
.partition::<Vec<_>, _>(|conn| should_reap(conn, &pool.options));
let pool = pool.clone();
let pool_tx = pool_tx.clone();
for conn in keep {
// return these connections to the pool first
pool_tx.send(conn).await;
}
task::spawn(async move {
while !pool.closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool
.size
.load(Ordering::Acquire)
.saturating_sub(pool.options.min_idle);
for conn in reap {
conn.close().await;
pool.size.fetch_sub(1, Ordering::AcqRel);
}
// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pool_rx.recv().now_or_never()?)
.partition::<Vec<_>, _>(|conn| should_reap(conn, &pool.options));
task::sleep(reap_period).await;
for conn in keep {
// return these connections to the pool first
pool_tx.send(conn).await;
}
});
}
for conn in reap {
conn.close().await;
pool.size.fetch_sub(1, Ordering::AcqRel);
}
task::sleep(period).await;
}
});
}

View file

@ -25,8 +25,8 @@ pub type PgPool = super::Pool<Postgres>;
// used in tests and hidden code in examples
#[doc(hidden)]
pub async fn connect<T>(url: T) -> crate::Result<PgConnection>
where
T: TryInto<Url, Error = crate::Error>
where
T: TryInto<Url, Error = crate::Error>,
{
PgConnection::open(url.try_into()).await
}

View file

@ -7,10 +7,10 @@ use crate::row::FromRow;
use crate::types::HasSqlType;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::TryFutureExt;
use futures_util::TryStreamExt;
use std::marker::PhantomData;
use futures_core::Stream;
/// Dynamic SQL query with bind parameters. Returned by [query].
pub struct Query<'q, DB, T = <DB as Database>::Arguments>
@ -31,13 +31,15 @@ where
where
E: Executor<Database = DB>,
{
executor.execute(self.query, self.arguments.into_arguments()).await
executor
.execute(self.query, self.arguments.into_arguments())
.await
}
pub fn fetch<'e, E>(self, executor: &'e mut E) -> BoxStream<'e, crate::Result<DB::Row>>
where
E: Executor<Database = DB>,
'q: 'e
'q: 'e,
{
executor.fetch(self.query, self.arguments.into_arguments())
}
@ -46,7 +48,8 @@ where
where
E: Executor<Database = DB>,
{
executor.fetch(self.query, self.arguments.into_arguments())
executor
.fetch(self.query, self.arguments.into_arguments())
.try_collect()
.await
}
@ -55,18 +58,18 @@ where
where
E: Executor<Database = DB>,
{
executor
.fetch_optional(self.query, self.arguments.into_arguments())
.await
executor
.fetch_optional(self.query, self.arguments.into_arguments())
.await
}
pub async fn fetch_one<E>(self, executor: &mut E) -> crate::Result<DB::Row>
where
E: Executor<Database = DB>,
{
executor
.fetch_one(self.query, self.arguments.into_arguments())
.await
executor
.fetch_one(self.query, self.arguments.into_arguments())
.await
}
}

View file

@ -3,8 +3,11 @@ use std::marker::PhantomData;
use futures_core::{future::BoxFuture, stream::BoxStream, Stream};
use futures_util::{future, FutureExt, TryFutureExt, TryStreamExt};
use crate::{arguments::IntoArguments, database::Database, decode::Decode, encode::Encode, executor::Executor, row::FromRow, types::HasSqlType};
use crate::arguments::Arguments;
use crate::{
arguments::IntoArguments, database::Database, decode::Decode, encode::Encode,
executor::Executor, row::FromRow, types::HasSqlType,
};
use std::task::Poll;
/// SQL query with bind parameters and output type. Returned by [query_as] and [query!] *et al*.
@ -19,15 +22,17 @@ where
/// The result of [query!] for SQL queries that do not return results.
impl<DB, P> QueryAs<'_, DB, (), P>
where
DB: Database,
P: IntoArguments<DB> + Send,
where
DB: Database,
P: IntoArguments<DB> + Send,
{
pub async fn execute<E>(self, executor: &mut E) -> crate::Result<u64>
where
E: Executor<Database = DB>,
where
E: Executor<Database = DB>,
{
executor.execute(self.query, self.args.into_arguments()).await
executor
.execute(self.query, self.args.into_arguments())
.await
}
}
@ -42,8 +47,14 @@ where
E: Executor<Database = DB>,
'q: 'e,
{
let Self { query, args, map_row, ..} = self;
executor.fetch(query, args.into_arguments())
let Self {
query,
args,
map_row,
..
} = self;
executor
.fetch(query, args.into_arguments())
.and_then(move |row| future::ready(map_row(row)))
}
@ -58,7 +69,8 @@ where
where
E: Executor<Database = DB>,
{
executor.fetch_optional(self.query, self.args.into_arguments())
executor
.fetch_optional(self.query, self.args.into_arguments())
.await?
.map(self.map_row)
.transpose()
@ -68,14 +80,18 @@ where
where
E: Executor<Database = DB>,
{
(self.map_row)(executor.fetch_one(self.query, self.args.into_arguments()).await?)
(self.map_row)(
executor
.fetch_one(self.query, self.args.into_arguments())
.await?,
)
}
}
impl<'q, DB, R> QueryAs<'q, DB, R>
where
DB: Database,
DB::Arguments: Arguments<Database = DB>
DB::Arguments: Arguments<Database = DB>,
{
/// Bind a value for use with this SQL query.
///
@ -95,7 +111,10 @@ where
// used by query!() and friends
#[doc(hidden)]
pub fn bind_all<I>(self, values: I) -> QueryAs<'q, DB, R, I> where I: IntoArguments<DB> {
pub fn bind_all<I>(self, values: I) -> QueryAs<'q, DB, R, I>
where
I: IntoArguments<DB>,
{
QueryAs {
query: self.query,
args: values,
@ -114,18 +133,21 @@ where
QueryAs {
query,
args: Default::default(),
map_row: |row| Ok(T::from_row(row))
map_row: |row| Ok(T::from_row(row)),
}
}
#[doc(hidden)]
pub fn query_as_mapped<DB, T>(query: &str, map_row: fn(DB::Row) -> crate::Result<T>) -> QueryAs<DB, T>
where
DB: Database,
pub fn query_as_mapped<DB, T>(
query: &str,
map_row: fn(DB::Row) -> crate::Result<T>,
) -> QueryAs<DB, T>
where
DB: Database,
{
QueryAs {
query,
args: Default::default(),
map_row
map_row,
}
}

View file

@ -13,10 +13,13 @@ impl_database_ext! {
#[cfg(feature = "chrono")]
sqlx::types::chrono::NaiveTime,
#[cfg(feature = "chrono")]
sqlx::types::chrono::NaiveDate,
#[cfg(feature = "chrono")]
sqlx::types::chrono::NaiveDateTime,
#[cfg(feature = "chrono")]
sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc> | sqlx::types::chrono::DateTime<_>,
},

View file

@ -101,4 +101,4 @@ pub fn query_as(input: TokenStream) -> TokenStream {
pub fn query_file_as(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as QueryAsMacroInput);
async_macro!(db => expand_query_file_as(input, db))
}
}

View file

@ -2,19 +2,22 @@ use std::path::Path;
use proc_macro2::TokenStream;
use quote::{quote, quote_spanned, ToTokens};
use syn::Expr;
use syn::spanned::Spanned;
use syn::Expr;
use sqlx::describe::Describe;
use crate::database::{DatabaseExt, ParamChecking};
use crate::query_macros::QueryMacroInput;
pub fn quote_args<DB: DatabaseExt>(input: &QueryMacroInput, describe: &Describe<DB>) -> crate::Result<TokenStream> {
pub fn quote_args<DB: DatabaseExt>(
input: &QueryMacroInput,
describe: &Describe<DB>,
) -> crate::Result<TokenStream> {
if input.args.is_empty() {
return Ok(quote! {
let args = ();
})
});
}
let args_check = if DB::PARAM_CHECKING == ParamChecking::Strong {

View file

@ -2,13 +2,13 @@ use std::env;
use async_std::fs;
use proc_macro2::Span;
use syn::{Expr, ExprLit, ExprPath, Path, Lit};
use syn::parse::{Parse, ParseStream};
use syn::punctuated::Punctuated;
use syn::Token;
use syn::{Expr, ExprLit, ExprPath, Lit, Path};
use sqlx::Connection;
use sqlx::describe::Describe;
use sqlx::Connection;
/// Macro input shared by `query!()` and `query_file!()`
pub struct QueryMacroInput {
@ -21,8 +21,8 @@ impl QueryMacroInput {
fn from_exprs(input: ParseStream, mut args: impl Iterator<Item = Expr>) -> syn::Result<Self> {
let sql = match args.next() {
Some(Expr::Lit(ExprLit {
lit: Lit::Str(sql), ..
})) => sql,
lit: Lit::Str(sql), ..
})) => sql,
Some(other_expr) => {
return Err(syn::Error::new_spanned(
other_expr,
@ -42,15 +42,15 @@ impl QueryMacroInput {
pub async fn expand_file_src(self) -> syn::Result<Self> {
let source = read_file_src(&self.source, self.source_span).await?;
Ok(Self {
source,
..self
})
Ok(Self { source, ..self })
}
/// Run a parse/describe on the query described by this input and validate that it matches the
/// passed number of args
pub async fn describe_validate<C: Connection>(&self, conn: &mut C) -> crate::Result<Describe<C::Database>> {
pub async fn describe_validate<C: Connection>(
&self,
conn: &mut C,
) -> crate::Result<Describe<C::Database>> {
let describe = conn
.describe(&self.source)
.await
@ -65,7 +65,7 @@ impl QueryMacroInput {
self.args.len()
),
)
.into());
.into());
}
Ok(describe)
@ -83,7 +83,7 @@ impl Parse for QueryMacroInput {
/// Macro input shared by `query_as!()` and `query_file_as!()`
pub struct QueryAsMacroInput {
pub(super) as_ty: ExprPath,
pub(super) query_input: QueryMacroInput
pub(super) query_input: QueryMacroInput,
}
impl QueryAsMacroInput {
@ -106,12 +106,13 @@ impl Parse for QueryAsMacroInput {
other_expr,
"expected path to a type",
));
},
}
None => return Err(input.error("expected path to SQL file")),
};
Ok(QueryAsMacroInput {
as_ty, query_input: QueryMacroInput::from_exprs(input, args)?,
as_ty,
query_input: QueryMacroInput::from_exprs(input, args)?,
})
}
}
@ -122,26 +123,44 @@ async fn read_file_src(source: &str, source_span: Span) -> syn::Result<String> {
let path = Path::new(source);
if path.is_absolute() {
return Err(syn::Error::new(source_span,
"absolute paths will only work on the current machine"));
return Err(syn::Error::new(
source_span,
"absolute paths will only work on the current machine",
));
}
// requires `proc_macro::SourceFile::path()` to be stable
// https://github.com/rust-lang/rust/issues/54725
if path.is_relative() && !path.parent().map_or(false, |parent| !parent.as_os_str().is_empty()) {
return Err(syn::Error::new(source_span,
"paths relative to the current file's directory are not currently supported"));
if path.is_relative()
&& !path
.parent()
.map_or(false, |parent| !parent.as_os_str().is_empty())
{
return Err(syn::Error::new(
source_span,
"paths relative to the current file's directory are not currently supported",
));
}
let base_dir = env::var("CARGO_MANIFEST_DIR")
.map_err(|_| syn::Error::new(source_span,
"CARGO_MANIFEST_DIR is not set; please use Cargo to build"))?;
let base_dir = env::var("CARGO_MANIFEST_DIR").map_err(|_| {
syn::Error::new(
source_span,
"CARGO_MANIFEST_DIR is not set; please use Cargo to build",
)
})?;
let base_dir_path = Path::new(&base_dir);
let file_path = base_dir_path.join(path);
fs::read_to_string(&file_path)
.await
.map_err(|e| syn::Error::new(source_span, format!("failed to read query file at {}: {}", file_path.display(), e)))
}
fs::read_to_string(&file_path).await.map_err(|e| {
syn::Error::new(
source_span,
format!(
"failed to read query file at {}: {}",
file_path.display(),
e
),
)
})
}

View file

@ -1,6 +1,6 @@
use std::fmt::Display;
use proc_macro2::{TokenStream, Span, Ident};
use proc_macro2::{Ident, Span, TokenStream};
use quote::{format_ident, quote};
pub use input::{QueryAsMacroInput, QueryMacroInput};
@ -9,21 +9,21 @@ pub use query::expand_query;
use crate::database::DatabaseExt;
use sqlx::describe::Describe;
use sqlx::Connection;
use sqlx::types::HasTypeMetadata;
use sqlx::Connection;
mod args;
mod input;
mod output;
mod args;
mod query;
pub async fn expand_query_file<C: Connection>(
input: QueryMacroInput,
conn: C,
) -> crate::Result<TokenStream>
where
C::Database: DatabaseExt + Sized,
<C::Database as HasTypeMetadata>::TypeId: Display,
where
C::Database: DatabaseExt + Sized,
<C::Database as HasTypeMetadata>::TypeId: Display,
{
expand_query(input.expand_file_src().await?, conn).await
}
@ -32,21 +32,28 @@ pub async fn expand_query_as<C: Connection>(
input: QueryAsMacroInput,
mut conn: C,
) -> crate::Result<TokenStream>
where
where
C::Database: DatabaseExt + Sized,
<C::Database as HasTypeMetadata>::TypeId: Display,
{
let describe = input.query_input.describe_validate(&mut conn).await?;
if describe.result_columns.is_empty() {
return Err(syn::Error::new(input.query_input.source_span,
"query must output at least one column").into());
return Err(syn::Error::new(
input.query_input.source_span,
"query must output at least one column",
)
.into());
}
let args_tokens = args::quote_args(&input.query_input, &describe)?;
let columns = output::columns_to_rust(&describe)?;
let output = output::quote_query_as::<C::Database>(&input.query_input.source, &input.as_ty.path, &columns);
let output = output::quote_query_as::<C::Database>(
&input.query_input.source,
&input.as_ty.path,
&columns,
);
Ok(quote! {{
#args_tokens
@ -58,9 +65,9 @@ pub async fn expand_query_file_as<C: Connection>(
input: QueryAsMacroInput,
conn: C,
) -> crate::Result<TokenStream>
where
C::Database: DatabaseExt + Sized,
<C::Database as HasTypeMetadata>::TypeId: Display,
where
C::Database: DatabaseExt + Sized,
<C::Database as HasTypeMetadata>::TypeId: Display,
{
expand_query_as(input.expand_file_src().await?, conn).await
}

View file

@ -1,6 +1,6 @@
use proc_macro2::{Ident, TokenStream};
use syn::{Path, Token};
use quote::quote;
use syn::{Path, Token};
use sqlx::describe::Describe;
@ -8,7 +8,7 @@ use crate::database::DatabaseExt;
pub struct RustColumn {
pub(super) ident: Ident,
pub(super) type_: TokenStream
pub(super) type_: TokenStream,
}
pub fn columns_to_rust<DB: DatabaseExt>(describe: &Describe<DB>) -> crate::Result<Vec<RustColumn>> {
@ -35,13 +35,21 @@ pub fn columns_to_rust<DB: DatabaseExt>(describe: &Describe<DB>) -> crate::Resul
.collect::<crate::Result<Vec<_>>>()
}
pub fn quote_query_as<DB: DatabaseExt>(sql: &str, out_ty: &Path, columns: &[RustColumn]) -> TokenStream {
let instantiations = columns
.iter()
.enumerate()
.map(|(i, &RustColumn { ref ident, ref type_, .. })| {
quote!( #ident: #i.try_get::<#type_>(&row).try_unwrap_optional()? )
});
pub fn quote_query_as<DB: DatabaseExt>(
sql: &str,
out_ty: &Path,
columns: &[RustColumn],
) -> TokenStream {
let instantiations = columns.iter().enumerate().map(
|(
i,
&RustColumn {
ref ident,
ref type_,
..
},
)| { quote!( #ident: #i.try_get::<#type_>(&row).try_unwrap_optional()? ) },
);
let db_path = DB::quotable_path();

View file

@ -2,13 +2,18 @@ use std::fmt::Display;
use proc_macro2::Span;
use proc_macro2::TokenStream;
use syn::{Expr, ExprLit, Ident, Lit, parse::{self, Parse, ParseStream}, punctuated::Punctuated, spanned::Spanned, Token, Path};
use syn::{
parse::{self, Parse, ParseStream},
punctuated::Punctuated,
spanned::Spanned,
Expr, ExprLit, Ident, Lit, Path, Token,
};
use quote::{format_ident, quote, quote_spanned, ToTokens};
use sqlx::{Connection, describe::Describe, types::HasTypeMetadata};
use sqlx::{describe::Describe, types::HasTypeMetadata, Connection};
use crate::database::{DatabaseExt, ParamChecking};
use super::{args, output, QueryMacroInput};
use crate::database::{DatabaseExt, ParamChecking};
/// Given an input like `query!("SELECT * FROM accounts WHERE account_id > ?", account_id)`,
/// expand to an anonymous record
@ -41,7 +46,12 @@ where
let record_fields = columns
.iter()
.map(|&output::RustColumn { ref ident, ref type_}| quote!(#ident: #type_,))
.map(
|&output::RustColumn {
ref ident,
ref type_,
}| quote!(#ident: #type_,),
)
.collect::<TokenStream>();
let output = output::quote_query_as::<C::Database>(sql, &record_type, &columns);

View file

@ -4,7 +4,9 @@
pub use sqlx_core::{arguments, decode, describe, encode, error, pool, row, types};
// Types
pub use sqlx_core::{Connection, Database, Error, Executor, FromRow, Pool, Query, QueryAs, Result, Row};
pub use sqlx_core::{
Connection, Database, Error, Executor, FromRow, Pool, Query, QueryAs, Result, Row,
};
// Functions
pub use sqlx_core::{query, query_as};

View file

@ -57,13 +57,12 @@ async fn macro_select_from_cte() -> anyhow::Result<()> {
select * from accounts where id = ?",
1i32
)
.fetch_one(&mut conn)
.await?;
.fetch_one(&mut conn)
.await?;
println!("{:?}", account);
println!("{}: {}", account.id, account.name);
Ok(())
}

View file

@ -1,11 +1,13 @@
#[async_std::test]
async fn test_query() -> sqlx::Result<()> {
let mut conn =
sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let mut conn = sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let account = sqlx::query!("SELECT * from (VALUES (1, 'Herp Derpinson')) accounts(id, name) where id = $1", 1i32)
.fetch_one(&mut conn)
.await?;
let account = sqlx::query!(
"SELECT * from (VALUES (1, 'Herp Derpinson')) accounts(id, name) where id = $1",
1i32
)
.fetch_one(&mut conn)
.await?;
println!("account ID: {:?}", account.id);
@ -14,11 +16,11 @@ async fn test_query() -> sqlx::Result<()> {
#[async_std::test]
async fn test_query_file() -> sqlx::Result<()> {
let mut conn =
sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let mut conn = sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let account = sqlx::query_file!("tests/test-query.sql")
.fetch_one(&mut conn).await?;
.fetch_one(&mut conn)
.await?;
println!("{:?}", account);
@ -33,11 +35,14 @@ struct Account {
#[async_std::test]
async fn test_query_as() -> sqlx::Result<()> {
let mut conn =
sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let mut conn = sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let account = sqlx::query_as!(Account, "SELECT * from (VALUES (1, null)) accounts(id, name)")
.fetch_one(&mut conn).await?;
let account = sqlx::query_as!(
Account,
"SELECT * from (VALUES (1, null)) accounts(id, name)"
)
.fetch_one(&mut conn)
.await?;
assert_eq!(None, account.name);
@ -48,11 +53,11 @@ async fn test_query_as() -> sqlx::Result<()> {
#[async_std::test]
async fn test_query_file_as() -> sqlx::Result<()> {
let mut conn =
sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let mut conn = sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let account = sqlx::query_file_as!(Account, "tests/test-query.sql")
.fetch_one(&mut conn).await?;
.fetch_one(&mut conn)
.await?;
println!("{:?}", account);
@ -67,13 +72,15 @@ async fn test_nullable_err() -> sqlx::Result<()> {
name: String,
}
let mut conn =
sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let mut conn = sqlx::postgres::connect(&dotenv::var("DATABASE_URL").unwrap()).await?;
let err = sqlx::query_as!(Account, "SELECT * from (VALUES (1, null::text)) accounts(id, name)")
.fetch_one(&mut conn)
.await
.unwrap_err();
let err = sqlx::query_as!(
Account,
"SELECT * from (VALUES (1, null::text)) accounts(id, name)"
)
.fetch_one(&mut conn)
.await
.unwrap_err();
if let sqlx::Error::Decode(sqlx::decode::DecodeError::UnexpectedNull) = err {
Ok(())