generalize MaybeOwnedConnection and clean up the connection file

This commit is contained in:
Ryan Leckey 2020-03-13 02:21:01 -07:00
parent 444ffff127
commit 751efdf31b
13 changed files with 110 additions and 65 deletions

View file

@ -3,6 +3,7 @@ use std::convert::TryInto;
use futures_core::future::BoxFuture;
use crate::executor::Executor;
use crate::maybe_owned::MaybeOwned;
use crate::pool::{Pool, PoolConnection};
use crate::transaction::Transaction;
use crate::url::Url;
@ -42,63 +43,32 @@ pub trait Connect: Connection {
Self: Sized;
}
mod internal {
#[allow(dead_code)]
pub enum MaybeOwnedConnection<'c, C>
where
C: super::Connect,
{
Borrowed(&'c mut C),
Owned(super::PoolConnection<C>),
}
pub(crate) enum ConnectionSource<'c, C>
where
C: Connect,
{
Connection(MaybeOwned<'c, PoolConnection<C>, C>),
#[allow(dead_code)]
pub enum ConnectionSource<'c, C>
where
C: super::Connect,
{
Connection(MaybeOwnedConnection<'c, C>),
Pool(super::Pool<C>),
}
Pool(Pool<C>),
}
pub(crate) use self::internal::{ConnectionSource, MaybeOwnedConnection};
impl<'c, C> ConnectionSource<'c, C>
where
C: Connect,
{
#[allow(dead_code)]
pub(crate) async fn resolve_by_ref(&mut self) -> crate::Result<&'_ mut C> {
pub(crate) async fn resolve(&mut self) -> crate::Result<&'_ mut C> {
if let ConnectionSource::Pool(pool) = self {
*self =
ConnectionSource::Connection(MaybeOwnedConnection::Owned(pool.acquire().await?));
*self = ConnectionSource::Connection(MaybeOwned::Owned(pool.acquire().await?));
}
Ok(match self {
ConnectionSource::Connection(conn) => match conn {
MaybeOwnedConnection::Borrowed(conn) => &mut *conn,
MaybeOwnedConnection::Owned(ref mut conn) => conn,
MaybeOwned::Borrowed(conn) => &mut *conn,
MaybeOwned::Owned(ref mut conn) => conn,
},
ConnectionSource::Pool(_) => unreachable!(),
})
}
}
impl<'c, C> From<&'c mut C> for MaybeOwnedConnection<'c, C>
where
C: Connect,
{
fn from(conn: &'c mut C) -> Self {
MaybeOwnedConnection::Borrowed(conn)
}
}
impl<'c, C> From<PoolConnection<C>> for MaybeOwnedConnection<'c, C>
where
C: Connect,
{
fn from(conn: PoolConnection<C>) -> Self {
MaybeOwnedConnection::Owned(conn)
}
}

View file

@ -1,6 +1,5 @@
use futures_core::future::BoxFuture;
use crate::connection::{Connect, MaybeOwnedConnection};
use crate::database::{Database, HasRow};
use crate::executor::Execute;
use crate::pool::Pool;
@ -19,18 +18,17 @@ where
{
type Database: Database;
#[doc(hidden)]
fn from_pool<E>(pool: &Pool<<Self::Database as Database>::Connection>, query: E) -> Self
where
Self: Sized,
E: Execute<'q, Self::Database>;
#[doc(hidden)]
fn from_connection<E, C>(conn: C, query: E) -> Self
fn from_connection<E>(
connection: &'c mut <Self::Database as Database>::Connection,
query: E,
) -> Self
where
Self: Sized,
<Self::Database as Database>::Connection: Connect,
C: Into<MaybeOwnedConnection<'c, <Self::Database as Database>::Connection>>,
E: Execute<'q, Self::Database>;
/// Fetch the next row in the result. Returns `None` if there are no more rows.

View file

@ -24,7 +24,10 @@ where
/// discarding any potential result rows.
///
/// Returns the number of rows affected, or 0 if not applicable.
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>;
@ -89,7 +92,10 @@ where
{
type Database = T::Database;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{

View file

@ -45,6 +45,7 @@ where
}
}
#[cfg(feature = "postgres")]
#[inline]
pub fn buffer<'c>(&'c self) -> &'c [u8] {
&self.rbuf[self.rbuf_rindex..]

View file

@ -15,6 +15,8 @@ pub mod error;
#[macro_use]
mod io;
mod maybe_owned;
pub mod connection;
pub mod cursor;
pub mod database;

View file

@ -0,0 +1,42 @@
use core::borrow::{Borrow, BorrowMut};
use core::ops::{Deref, DerefMut};
pub(crate) enum MaybeOwned<'a, O, B = O> {
#[allow(dead_code)]
Borrowed(&'a mut B),
#[allow(dead_code)]
Owned(O),
}
impl<'a, O, B> From<&'a mut B> for MaybeOwned<'a, O, B> {
fn from(val: &'a mut B) -> Self {
MaybeOwned::Borrowed(val)
}
}
impl<'a, O, B> Deref for MaybeOwned<'a, O, B>
where
O: Borrow<B>,
{
type Target = B;
fn deref(&self) -> &Self::Target {
match self {
MaybeOwned::Borrowed(val) => val,
MaybeOwned::Owned(ref val) => val.borrow(),
}
}
}
impl<'a, O, B> DerefMut for MaybeOwned<'a, O, B>
where
O: BorrowMut<B>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
MaybeOwned::Borrowed(val) => val,
MaybeOwned::Owned(ref mut val) => val.borrow_mut(),
}
}
}

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use futures_core::future::BoxFuture;
use crate::connection::{ConnectionSource, MaybeOwnedConnection};
use crate::connection::ConnectionSource;
use crate::cursor::Cursor;
use crate::executor::Execute;
use crate::mysql::protocol::{ColumnCount, ColumnDefinition, Row, Status, TypeId};
@ -21,7 +21,6 @@ pub struct MySqlCursor<'c, 'q> {
impl<'c, 'q> Cursor<'c, 'q> for MySqlCursor<'c, 'q> {
type Database = MySql;
#[doc(hidden)]
fn from_pool<E>(pool: &Pool<MySqlConnection>, query: E) -> Self
where
Self: Sized,
@ -36,11 +35,9 @@ impl<'c, 'q> Cursor<'c, 'q> for MySqlCursor<'c, 'q> {
}
}
#[doc(hidden)]
fn from_connection<E, C>(conn: C, query: E) -> Self
fn from_connection<E>(conn: &'c mut MySqlConnection, query: E) -> Self
where
Self: Sized,
C: Into<MaybeOwnedConnection<'c, MySqlConnection>>,
E: Execute<'q, MySql>,
{
Self {
@ -60,7 +57,7 @@ impl<'c, 'q> Cursor<'c, 'q> for MySqlCursor<'c, 'q> {
async fn next<'a, 'c: 'a, 'q: 'a>(
cursor: &'a mut MySqlCursor<'c, 'q>,
) -> crate::Result<Option<MySqlRow<'a>>> {
let mut conn = cursor.source.resolve_by_ref().await?;
let mut conn = cursor.source.resolve().await?;
// The first time [next] is called we need to actually execute our
// contained query. We guard against this happening on _all_ next calls

View file

@ -202,7 +202,10 @@ impl super::MySqlConnection {
impl Executor for super::MySqlConnection {
type Database = MySql;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{

View file

@ -1,4 +1,5 @@
use futures_core::future::BoxFuture;
use std::borrow::{Borrow, BorrowMut};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;
@ -35,6 +36,24 @@ pub(super) struct Floating<'p, C> {
const DEREF_ERR: &str = "(bug) connection already released to pool";
impl<C> Borrow<C> for PoolConnection<C>
where
C: Connect,
{
fn borrow(&self) -> &C {
&*self
}
}
impl<C> BorrowMut<C> for PoolConnection<C>
where
C: Connect,
{
fn borrow_mut(&mut self) -> &mut C {
&mut *self
}
}
impl<C> Deref for PoolConnection<C>
where
C: Connect,

View file

@ -17,7 +17,10 @@ where
{
type Database = DB;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
@ -65,7 +68,10 @@ where
{
type Database = C::Database;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use futures_core::future::BoxFuture;
use crate::connection::{ConnectionSource, MaybeOwnedConnection};
use crate::connection::ConnectionSource;
use crate::cursor::Cursor;
use crate::executor::Execute;
use crate::pool::Pool;
@ -22,7 +22,6 @@ pub struct PgCursor<'c, 'q> {
impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
type Database = Postgres;
#[doc(hidden)]
fn from_pool<E>(pool: &Pool<PgConnection>, query: E) -> Self
where
Self: Sized,
@ -36,11 +35,9 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
}
}
#[doc(hidden)]
fn from_connection<E, C>(conn: C, query: E) -> Self
fn from_connection<E>(conn: &'c mut PgConnection, query: E) -> Self
where
Self: Sized,
C: Into<MaybeOwnedConnection<'c, PgConnection>>,
E: Execute<'q, Postgres>,
{
Self {
@ -128,7 +125,7 @@ async fn get_or_describe(
async fn next<'a, 'c: 'a, 'q: 'a>(
cursor: &'a mut PgCursor<'c, 'q>,
) -> crate::Result<Option<PgRow<'a>>> {
let mut conn = cursor.source.resolve_by_ref().await?;
let mut conn = cursor.source.resolve().await?;
// The first time [next] is called we need to actually execute our
// contained query. We guard against this happening on _all_ next calls

View file

@ -368,7 +368,10 @@ impl PgConnection {
impl Executor for super::PgConnection {
type Database = Postgres;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{

View file

@ -29,6 +29,7 @@ impl<'s> TryFrom<&'s String> for Url {
}
impl Url {
#[allow(dead_code)]
pub(crate) fn as_str(&self) -> &str {
self.0.as_str()
}