Enhance Executor definition and remove inherent Executor methods on Pool and Connection

This commit is contained in:
Ryan Leckey 2019-08-28 08:48:18 -07:00
parent ee30296e32
commit f67421b50d
8 changed files with 259 additions and 230 deletions

View file

@ -2,12 +2,15 @@ use crate::{connection::RawConnection, query::QueryParameters, row::Row};
/// A database backend.
///
/// This trait is used to both allow distinct implementations of traits (
/// e.g., implementing `ToSql for Uuid` differently for MySQL and Postgres) and
/// to query capabilities within a database backend (e.g., with a specific
/// `Connection` can we `bind` a `i64`?).
/// This trait represents the concept of a backend (e.g. "MySQL" vs "SQLite").
pub trait Backend: Sized {
/// The concrete `QueryParameters` implementation for this backend.
type QueryParameters: QueryParameters<Backend = Self>;
/// The concrete `RawConnection` implementation for this backend.
type RawConnection: RawConnection<Backend = Self>;
/// The concrete `Row` implementation for this backend. This type is returned
/// from methods in the `RawConnection`.
type Row: Row<Backend = Self>;
}

View file

@ -19,7 +19,12 @@ use std::{
},
};
/// A connection to the database.
///
/// This trait is not intended to be used directly. Instead [sqlx::Connection] or [sqlx::Pool] should be used instead, which provide
/// concurrent access and typed retrieval of results.
pub trait RawConnection: Send {
/// The database backend this type connects to.
type Backend: Backend;
/// Establish a new connection to the database server.
@ -28,6 +33,7 @@ pub trait RawConnection: Send {
Self: Sized;
/// Release resources for this database connection immediately.
///
/// This method is not required to be called. A database server will eventually notice
/// and clean up not fully closed connections.
fn finalize<'c>(&'c mut self) -> BoxFuture<'c, Result<(), Error>>;
@ -83,40 +89,6 @@ where
async fn get(&self) -> ConnectionFairy<'_, DB> {
ConnectionFairy::new(&self.0, self.0.acquire().await)
}
#[inline]
pub async fn execute<A>(&self, query: &str, params: A) -> Result<u64, Error>
where
A: IntoQueryParameters<DB>,
{
Executor::execute(self, query, params.into()).await
}
#[inline]
pub fn fetch<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB> + Send + Unpin,
{
Executor::fetch(self, query, params.into())
}
#[inline]
pub async fn fetch_optional<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> Result<Option<T>, Error>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB>,
{
Executor::fetch_optional(self, query, params.into()).await
}
}
impl<DB> Executor for Connection<DB>
@ -125,28 +97,32 @@ where
{
type Backend = DB;
fn execute<'c, 'q: 'c>(
fn execute<'c, 'q: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
) -> BoxFuture<'c, Result<u64, Error>> {
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move {
let mut conn = self.get().await;
conn.execute(query, params).await
conn.execute(query, params.into()).await
})
}
fn fetch<'c, 'q: 'c, T: 'c>(
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut conn = self.get().await;
let mut s = conn.fetch(query, params);
let mut s = conn.fetch(query, params.into());
while let Some(row) = s.next().await.transpose()? {
yield T::from_row(row);
@ -154,17 +130,18 @@ where
})
}
fn fetch_optional<'c, 'q: 'c, T: 'c>(
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend>,
{
Box::pin(async move {
let mut conn = self.get().await;
let row = conn.fetch_optional(query, params).await?;
let row = conn.fetch_optional(query, params.into()).await?;
Ok(row.map(T::from_row))
})

View file

@ -1,31 +1,40 @@
use crate::{backend::Backend, error::Error, query::QueryParameters, row::FromSqlRow};
use crate::{
backend::Backend,
error::Error,
query::{IntoQueryParameters, QueryParameters},
row::FromSqlRow,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use std::io;
pub trait Executor: Send {
type Backend: Backend;
fn execute<'c, 'q: 'c>(
fn execute<'c, 'q: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
) -> BoxFuture<'c, Result<u64, Error>>;
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send;
fn fetch<'c, 'q: 'c, T: 'c>(
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend> + Send + Unpin;
fn fetch_optional<'c, 'q: 'c, T: 'c>(
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
T: FromSqlRow<Self::Backend>;
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend> + Send;
}
impl<'e, E> Executor for &'e E
@ -35,32 +44,37 @@ where
type Backend = E::Backend;
#[inline]
fn execute<'c, 'q: 'c>(
fn execute<'c, 'q: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
) -> BoxFuture<'c, Result<u64, Error>> {
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
(*self).execute(query, params)
}
fn fetch<'c, 'q: 'c, T: 'c>(
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend> + Send + Unpin,
{
(*self).fetch(query, params)
}
fn fetch_optional<'c, 'q: 'c, T: 'c>(
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
T: FromSqlRow<Self::Backend>,
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend> + Send,
{
(*self).fetch_optional(query, params)
}

View file

@ -21,15 +21,20 @@ mod connection;
pub mod error;
mod executor;
mod pool;
mod query;
#[macro_use]
pub mod query;
pub mod serialize;
mod sql;
pub mod types;
pub use self::{
connection::Connection,
error::Error,
executor::Executor,
pool::Pool,
query::{query, SqlQuery},
sql::{query, SqlQuery},
};
#[cfg(feature = "mariadb")]

View file

@ -62,40 +62,6 @@ where
},
}))
}
#[inline]
pub async fn execute<A>(&self, query: &str, params: A) -> Result<u64, Error>
where
A: IntoQueryParameters<DB>,
{
Executor::execute(self, query, params.into()).await
}
#[inline]
pub fn fetch<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB> + Send + Unpin,
{
Executor::fetch(self, query, params.into())
}
#[inline]
pub async fn fetch_optional<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> Result<Option<T>, Error>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB>,
{
Executor::fetch_optional(self, query, params.into()).await
}
}
struct SharedPool<DB>
@ -167,31 +133,35 @@ where
{
type Backend = DB;
fn execute<'c, 'q: 'c>(
fn execute<'c, 'q: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
) -> BoxFuture<'c, Result<u64, Error>> {
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move {
let live = self.0.acquire().await?;
let mut conn = PooledConnection::new(&self.0, live);
conn.execute(query, params).await
conn.execute(query, params.into()).await
})
}
fn fetch<'c, 'q: 'c, T: 'c>(
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let live = self.0.acquire().await?;
let mut conn = PooledConnection::new(&self.0, live);
let mut s = conn.fetch(query, params);
let mut s = conn.fetch(query, params.into());
while let Some(row) = s.next().await.transpose()? {
yield T::from_row(row);
@ -199,18 +169,19 @@ where
})
}
fn fetch_optional<'c, 'q: 'c, T: 'c>(
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c self,
query: &'q str,
params: <Self::Backend as Backend>::QueryParameters,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromSqlRow<Self::Backend>,
{
Box::pin(async move {
let live = self.0.acquire().await?;
let mut conn = PooledConnection::new(&self.0, live);
let row = conn.fetch_optional(query, params).await?;
let row = conn.fetch_optional(query, params.into()).await?;
Ok(row.map(T::from_row))
})

View file

@ -8,5 +8,6 @@ impl Backend for Postgres {
type Row = super::PostgresRow;
}
// Generates tuple FromSqlRow impls for this backend
// Generates tuple impls for this backend
impl_from_sql_row_tuples_for_backend!(Postgres);
impl_into_query_parameters_for_backend!(Postgres);

View file

@ -27,138 +27,116 @@ where
#[allow(unused)]
macro_rules! impl_into_query_parameters {
($( ($idx:tt) -> $T:ident );+;) => {
impl<$($T,)+ DB> IntoQueryParameters<DB> for ($($T,)+)
($B:ident: $( ($idx:tt) -> $T:ident );+;) => {
impl<$($T,)+> crate::query::IntoQueryParameters<$B> for ($($T,)+)
where
DB: Backend,
$(DB: crate::types::HasSqlType<$T>,)+
$($T: crate::serialize::ToSql<DB>,)+
$($B: crate::types::HasSqlType<$T>,)+
$($T: crate::serialize::ToSql<$B>,)+
{
fn into(self) -> DB::QueryParameters {
let mut params = DB::QueryParameters::new();
$(params.bind(self.$idx);)+
fn into(self) -> <$B as crate::backend::Backend>::QueryParameters {
let mut params = <<$B as crate::backend::Backend>::QueryParameters
as crate::query::QueryParameters>::new();
$(crate::query::QueryParameters::bind(&mut params, self.$idx);)+
params
}
}
};
}
impl<DB> IntoQueryParameters<DB> for ()
impl<DB> IntoQueryParameters<DB> for DB::QueryParameters
where
DB: Backend,
{
#[inline]
fn into(self) -> DB::QueryParameters {
DB::QueryParameters::new()
}
}
impl_into_query_parameters!(
(0) -> T1;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
(5) -> T6;
);
pub struct SqlQuery<'q, DB>
where
DB: Backend,
{
query: &'q str,
params: DB::QueryParameters,
}
impl<'q, DB> SqlQuery<'q, DB>
where
DB: Backend,
{
#[inline]
pub fn new(query: &'q str) -> Self {
Self {
query,
params: DB::QueryParameters::new(),
}
}
#[inline]
pub fn bind<T>(mut self, value: T) -> Self
where
DB: HasSqlType<T>,
T: ToSql<DB>,
{
self.params.bind(value);
self
}
// TODO: These methods should go on a [Execute] trait (so more execut-able things can be defined)
#[inline]
pub fn execute<E>(self, executor: &'q E) -> BoxFuture<'q, Result<u64, Error>>
where
E: Executor<Backend = DB>,
{
executor.execute(self.query, self.params)
}
#[inline]
pub fn fetch<E, T: 'q>(self, executor: &'q E) -> BoxStream<'q, Result<T, Error>>
where
E: Executor<Backend = DB>,
T: FromSqlRow<DB> + Send + Unpin,
{
executor.fetch(self.query, self.params)
}
#[inline]
pub fn fetch_optional<E, T: 'q>(
self,
executor: &'q E,
) -> BoxFuture<'q, Result<Option<T>, Error>>
where
E: Executor<Backend = DB>,
T: FromSqlRow<DB>,
{
executor.fetch_optional(self.query, self.params)
}
}
/// Construct a full SQL query using raw SQL.
#[inline]
pub fn query<'q, DB>(query: &'q str) -> SqlQuery<'q, DB>
where
DB: Backend,
{
SqlQuery::new(query)
#[allow(unused)]
macro_rules! impl_into_query_parameters_for_backend {
($B:ident) => {
impl crate::query::IntoQueryParameters<$B> for ()
{
#[inline]
fn into(self) -> <$B as crate::backend::Backend>::QueryParameters {
<<$B as crate::backend::Backend>::QueryParameters
as crate::query::QueryParameters>::new()
}
}
impl_into_query_parameters!($B:
(0) -> T1;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
(5) -> T6;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
(5) -> T6;
(6) -> T7;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
(5) -> T6;
(6) -> T7;
(7) -> T8;
);
impl_into_query_parameters!($B:
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
(5) -> T6;
(6) -> T7;
(7) -> T8;
(8) -> T9;
);
}
}

80
src/sql.rs Normal file
View file

@ -0,0 +1,80 @@
use crate::{
backend::Backend, error::Error, executor::Executor, query::QueryParameters, row::FromSqlRow,
serialize::ToSql, types::HasSqlType,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use std::io;
pub struct SqlQuery<'q, DB>
where
DB: Backend,
{
query: &'q str,
params: DB::QueryParameters,
}
impl<'q, DB> SqlQuery<'q, DB>
where
DB: Backend,
{
#[inline]
pub fn new(query: &'q str) -> Self {
Self {
query,
params: DB::QueryParameters::new(),
}
}
#[inline]
pub fn bind<T>(mut self, value: T) -> Self
where
DB: HasSqlType<T>,
T: ToSql<DB>,
{
self.params.bind(value);
self
}
// TODO: These methods should go on a [Execute] trait (so more execut-able things can be defined)
#[inline]
pub fn execute<E>(self, executor: &'q E) -> BoxFuture<'q, Result<u64, Error>>
where
E: Executor<Backend = DB>,
DB::QueryParameters: 'q,
{
executor.execute(self.query, self.params)
}
#[inline]
pub fn fetch<E, T: 'q>(self, executor: &'q E) -> BoxStream<'q, Result<T, Error>>
where
E: Executor<Backend = DB>,
T: FromSqlRow<DB> + Send + Unpin,
DB::QueryParameters: 'q,
{
executor.fetch(self.query, self.params)
}
#[inline]
pub fn fetch_optional<E, T: 'q>(
self,
executor: &'q E,
) -> BoxFuture<'q, Result<Option<T>, Error>>
where
E: Executor<Backend = DB>,
T: FromSqlRow<DB> + Send,
DB::QueryParameters: 'q,
{
executor.fetch_optional(self.query, self.params)
}
}
/// Construct a full SQL query using raw SQL.
#[inline]
pub fn query<'q, DB>(query: &'q str) -> SqlQuery<'q, DB>
where
DB: Backend,
{
SqlQuery::new(query)
}